可扩展固定宽度文件处理模块
xfw的Python项目详细描述
xfw是一个可扩展的固定宽度文件处理模块。
Features
- 字段类型(整数、字符串、日期)独立于 文件结构,可以通过子类化进行扩展。(基场 子类)
- 多字段结构声明(fieldlist类)
- 非同构文件文件结构声明(fieldlistfile)
- 校验和/哈希计算帮助程序(checksumedfile子类)
- 不依赖于行概念(文件可能根本不包含CR/LF字符 连续字段集之间)
Missing features / bugs
- 字符串构造是多字节的(utf-8,…)不可知的,并且会无意识地剪切 在任何实体的中间 如果字段是以某种编码的字符数定义的,只需使用 为xfw提供unicode对象,并在其外部进行代码转换。见 codecs标准模块。
- 正确的接口声明
- 默认情况下,解析时应强制转换字段(integerfield、datetimefield)
- 字段列表总长度应设置为可选,并且仅用于 在记录结束时自动生成无注释填充 单个字段长度。
Example
dislaimer:给出的文件格式纯粹是假设的,不是来自任何规范的 我知道,不应该作为一个指南,而应该作为xfw的展示。 能力。
假设一个文件由一个通用头组成,其中包含 常量值5个字符的标识符,一个3个字符的整数,表示记录的数目 包含,和可选的20个字符的注释。之后是记录,组成 由日期(yyyymmdd)、行类型(2字符整数)组成的头本身 和行数(2个字符的整数),然后是行。行类型全部开始 带时间(hhmms),后跟取决于行类型的字段:
- 类型1:10个字符的字符串
- 类型2:一个2字符的整数,8字符的填充,一个1字符的整数
要将以下代码作为doctest运行,请运行:
python -m doctest README.rst
声明所有文件结构:
>>> import xfw >>> ROOT_HEADER = xfw.FieldList([ ... (xfw.StringField(5), True, 'header_id'), ... (xfw.IntegerField(3, cast=True), True, 'block_count'), ... (xfw.StringField(15), False, 'comment'), ... ], 23, fixed_value_dict={ ... 'header_id': 'HEAD1', ... }) >>> BLOCK_HEADER = xfw.FieldList([ ... (xfw.DateTimeField('%Y%m%d', cast=True), True, 'date'), ... (xfw.IntegerField(2, cast=True), True, 'row_type'), ... (xfw.IntegerField(2, cast=True), True, 'row_count'), ... ], 12) >>> ROW_BASE = xfw.FieldList([ ... (xfw.DateTimeField('%H%M%S', cast=True), True, 'time'), ... ], 6) >>> ROW_TYPE_DICT = { ... 1: xfw.FieldList([ ... ROW_BASE, ... (xfw.StringField(10), True, 'description'), ... ], 16), ... 2: xfw.FieldList([ ... ROW_BASE, ... (xfw.IntegerField(2, cast=True), True, 'some_value'), ... (xfw.StringField(8), False, None), # annonymous padding ... (xfw.IntegerField(1, cast=True), True, 'another_value'), ... ], 17), ... } >>> def blockCallback(head, item_list=None): ... if item_list is None: ... row_count = head['row_count'] ... else: ... row_count = len(item_list) ... return row_count, ROW_TYPE_DICT[head['row_type']] >>> FILE_STRUCTURE = xfw.ConstItemTypeFile( ... ROOT_HEADER, ... 'block_count', ... xfw.FieldListFile( ... BLOCK_HEADER, ... blockCallback, ... separator='\n', ... ), ... separator='\n', ... )
通过哈希助手包装器(sha1)解析示例文件:
>>> from cStringIO import StringIO >>> sample_file = StringIO( ... 'HEAD1002blah \n' ... '201112260101\n' ... '115500other str \n' ... '201112260201\n' ... '11550099 8' ... ) >>> from datetime import datetime >>> checksumed_wrapper = xfw.SHA1ChecksumedFile(sample_file) >>> parsed_file = FILE_STRUCTURE.parseStream(checksumed_wrapper) >>> parsed_file == \ ... ( ... { ... 'header_id': 'HEAD1', ... 'block_count': 2, ... 'comment': 'blah', ... }, ... [ ... ( ... { ... 'date': datetime(2011, 12, 26, 0, 0), ... 'row_type': 1, ... 'row_count': 1, ... }, ... [ ... { ... 'time': datetime(1900, 1, 1, 11, 55), ... 'description': 'other str', ... }, ... ] ... ), ... ( ... { ... 'date': datetime(2011, 12, 26, 0, 0), ... 'row_type': 2, ... 'row_count': 1, ... }, ... [ ... { ... 'time': datetime(1900, 1, 1, 11, 55), ... 'some_value': 99, ... 'another_value': 8, ... }, ... ] ... ), ... ], ... ) True
验证SHA1是否正确累积:
>>> import hashlib >>> hashlib.sha1(sample_file.getvalue()).hexdigest() == checksumed_wrapper.getHexDigest() True
从已分析的数据生成文件(如上所述已验证正确):
>>> generated_stream = StringIO() >>> FILE_STRUCTURE.generateStream(generated_stream, parsed_file) >>> generated_stream.getvalue() == sample_file.getvalue() True
同样,使用unicode对象并生成不同二进制文件的流 长度,尽管包含相同数量的实体。请注意 格式声明中定义的固定值是可选的(例如:header_id), 依赖值是自动计算的(例如:block_count)。
使用符合单个utf-8编码字节的unicode字符生成:
>>> import codecs >>> encoded_writer = codecs.getwriter('UTF-8') >>> input_data = ( ... { ... 'comment': u'Just ASCII', ... }, ... [], ... ) >>> sample_file = StringIO() >>> FILE_STRUCTURE.generateStream(encoded_writer(sample_file), input_data) >>> sample_file.getvalue() 'HEAD1000Just ASCII ' >>> len(sample_file.getvalue()) 23
再次生成,编码时需要更多字节的字符,并演示 校验和生成:
>>> wide_input_data = ( ... { ... 'comment': u'\u3042\u3044\u3046\u3048\u304a\u304b\u304d\u304f\u3051\u3053\u3055\u3057\u3059\u305b\u305d', ... }, ... [], ... ) >>> wide_sample_file = StringIO() >>> checksumed_wrapper = xfw.SHA1ChecksumedFile(wide_sample_file) >>> FILE_STRUCTURE.generateStream(encoded_writer(checksumed_wrapper), wide_input_data) >>> wide_sample_file.getvalue() 'HEAD1000\xe3\x81\x82\xe3\x81\x84\xe3\x81\x86\xe3\x81\x88\xe3\x81\x8a\xe3\x81\x8b\xe3\x81\x8d\xe3\x81\x8f\xe3\x81\x91\xe3\x81\x93\xe3\x81\x95\xe3\x81\x97\xe3\x81\x99\xe3\x81\x9b\xe3\x81\x9d' >>> len(wide_sample_file.getvalue()) 53 >>> hashlib.sha1(wide_sample_file.getvalue()).hexdigest() == checksumed_wrapper.getHexDigest() True
不过,两者都解析为各自的原始数据:
>>> encoded_reader = codecs.getreader('UTF-8') >>> FILE_STRUCTURE.parseStream(encoded_reader(StringIO(sample_file.getvalue())))[0]['comment'] u'Just ASCII' >>> FILE_STRUCTURE.parseStream(encoded_reader(StringIO(wide_sample_file.getvalue())))[0]['comment'] u'\u3042\u3044\u3046\u3048\u304a\u304b\u304d\u304f\u3051\u3053\u3055\u3057\u3059\u305b\u305d'