Pytables/Pandas:合并(读取?)按行划分的多个HDF5存储
在“写一次,读多次”的工作流程中,我经常需要解析从Teradata导出的超大文本文件(20GB到60GB),然后使用Pandas将这些文件加载到Pytables中。我使用多进程来将文本文件分块,并把它们分发到不同的进程中,以便根据行数将每个文件分成大约500万行的.H5文件,这样可以支持并行写入。这样做的速度相当快,多个hdf5文件并行写入大约需要12分钟,而写入一个包含2500万行和64列的单个hdf5文件则需要22分钟。
%timeit -n 1 write_single_hdf_multiprocess()
1 loops, best of 3: 22min 42s per loop
%timeit -n 1 write_multiple_hdf_multiprocess()
1 loops, best of 3: 12min 12s per loop
在写入多个按行分割的h5文件的情况下,我最终会得到多个结构相同的文件,我希望将它们合并成一个单一的h5文件,路径为root/data/table
为了测试合并功能,这里有一段代码示例:
import tables as tb
import pandas as pd
tb.setBloscMaxThreads(15)
store =pd.HDFStore('temp15.h5',complib='blosc')
filenames=['part_1.h5','part_2.h5','part_3.h5','part_4.h5','part_5.h5']
for f in filenames:
s=pd.HDFStore(f)
df=s.select('data')
store.append(key='data',value=df,format='t',chunksize=200000)
store.close()
这是这个操作的%timeit结果:
1 loops, best of 3: 8min 22s per loop
这基本上消耗了我通过并行写入多个h5文件所节省的大部分时间。我有两个问题:
有没有更有效的方法来合并(追加)具有相同表格式的h5文件?(类似SQL的Union功能)。我尝试过这个,但没能成功追加表格。
如果没有,按行分割是否合理,尤其是当大多数查询都是针对所有列的选择时?我在考虑写一个map/combine函数,来查找表的所有部分以处理选择查询。Pandas的select_as_multiple()函数可以根据列进行分割。
根据Jeff的建议更新:
非常感谢你提到在合并前的文件写入过程中去掉索引和压缩。去掉索引和压缩后,将每个合并前文件的最大行数设置为100万行:
%timeit -n 1 write_multiple_hdf_multiprocess()
1 loops, best of 3: 9min 37s per loop
这比之前快了两分钟多,几乎是我能解析数据的最快速度。在将数据列设置为所需字段(在我这里是3个字段)后:
for f in filenames:
s=pd.HDFStore(f)
df=s.select('data')
dc=df.columns[1:4]
store.append(key='data',value=df,format='t',data_columns=dc)
这比之前慢了大约两分钟:1 loops, best of 3: 10min 23s per loop
。在去掉上面代码中的压缩后,我得到了1 loops, best of 3: 8min 48s per loop
(几乎和第一次尝试的结果相同,都是没有数据列索引的压缩)。为了让你了解压缩效果如何,未压缩的存储大约是13.5GB,而使用blosc
压缩后的版本大约是3.7GB。
总的来说,我的处理过程需要18分钟15秒
来创建一个合并的未压缩hdf5文件。与单个文件写入(压缩)相比,快了大约4分钟7秒
。
这让我想到了我的第二个问题,如果我不合并文件,而是使用合并前的文件以map/combine的方式处理,这样的做法是否合理?我应该如何考虑实现这个?
为了完全透明,我使用的Pandas版本是0.12.0
,Pytables版本是3.0.0
,我的数据处理工作流程如下(伪代码):
def generate_chunks_from_text_file(reader,chunksize=50000):
""" generator that yields processed text chunks """
for i, line in enumerate(reader.readlines()):
----process data and yield chunk -----
def data_reader(reader,queue):
""" read data from file and put it into a queue for multiprocessing """
for chunk in self.generate_chunks_from_text_file(reader):
queue.put(chunk) # put data in the queue for the writer
def data_processor(queue,filename,dtype,min_size):
"""" subprocess that reads the next value in the queue and writes hdf store. """
store=pd.HDFStore(filename)
while True:
results = queue.get()
array=np.array(results,dtype=dt) # convert to numpy array
df = pd.DataFrame(array) #covert to pandas array
store.append(key='data', value=df, format='t', min_itemsize=dict(min_size), data_columns=[],index=False)
store.close()
----when queue exhausts - break-----
1 个回答
我使用一种类似的方式,先分开处理再合并,利用多个进程来创建中间文件,然后用一个进程来合并这些文件。这里有一些提高性能的小建议:
在写文件的时候,可以通过设置
index=False
来关闭索引,具体可以查看 这里 的文档。我认为PyTables
会逐步更新索引,而在这种情况下完全没有必要(因为你之后会合并这些文件)。只对最终的文件建立索引。这样可以大大加快写入速度。你可以考虑根据你的查询需求,改变默认的索引方式或级别(假设你遵循下面几个建议,不要创建太多数据列)。
同样的道理,在写预合并文件的时候,不要创建压缩文件,而是在写完索引文件后再创建(以未压缩的状态),这应该是你的最后一步。具体可以查看 这里 的文档。此外,使用
ptrepack
时,务必要传入--chunkshape=auto
,这样可以重新计算 PyTables 的块大小(比如一次读取或写入多少数据),它会考虑整个表格。关于压缩,效果因数据而异,取决于你的数据压缩效果如何,以及你进行的查询类型。我发现有些数据不压缩反而更快,尽管理论上压缩应该更好。你需要自己尝试(不过我总是使用
blosc
)。Blosc 只有一个压缩级别(1-9 级别是开启的,0 级别是关闭的)。所以改变这个不会有任何效果。我按照索引顺序合并文件,基本上是先把一部分预合并文件读入内存(保持一个固定的数量,以使用固定的内存),然后一个一个地追加到最终文件中。(不太确定这样是否真的有区别,但似乎效果不错)。
你会发现大部分时间都花在
创建
索引上。而且,只索引你实际需要的列!在写每个文件时,确保指定
data_columns=a_small_subset_of_columns
。我发现写很多小文件再合并成一个大文件效果更好,而不是写几个大文件,但这可能因人而异。(比如说写 100 个 100MB 的预合并文件,最后得到一个 10GB 的文件,而不是 5 个 2GB 的文件)。不过这可能与我的处理流程有关,因为我通常在处理上会遇到瓶颈,而不是实际的写入。
我没有使用过,但听说使用 SSD(固态硬盘)效果非常好,即使它相对较小。在这种情况下,使用 SSD 可以大幅提升速度(压缩可能会改变这个结果)。