Pytables/Pandas:合并(读取?)按行划分的多个HDF5存储

7 投票
1 回答
6889 浏览
提问于 2025-04-18 00:35

在“写一次,读多次”的工作流程中,我经常需要解析从Teradata导出的超大文本文件(20GB到60GB),然后使用Pandas将这些文件加载到Pytables中。我使用多进程来将文本文件分块,并把它们分发到不同的进程中,以便根据行数将每个文件分成大约500万行的.H5文件,这样可以支持并行写入。这样做的速度相当快,多个hdf5文件并行写入大约需要12分钟,而写入一个包含2500万行和64列的单个hdf5文件则需要22分钟。

%timeit -n 1 write_single_hdf_multiprocess()
1 loops, best of 3: 22min 42s per loop

%timeit -n 1 write_multiple_hdf_multiprocess()
1 loops, best of 3: 12min 12s per loop

在写入多个按行分割的h5文件的情况下,我最终会得到多个结构相同的文件,我希望将它们合并成一个单一的h5文件,路径为root/data/table

为了测试合并功能,这里有一段代码示例:

import tables as tb
import pandas as pd

tb.setBloscMaxThreads(15)
store =pd.HDFStore('temp15.h5',complib='blosc')

filenames=['part_1.h5','part_2.h5','part_3.h5','part_4.h5','part_5.h5']

for f in filenames:
    s=pd.HDFStore(f)
    df=s.select('data')
    store.append(key='data',value=df,format='t',chunksize=200000)

store.close()

这是这个操作的%timeit结果:

1 loops, best of 3: 8min 22s per loop

这基本上消耗了我通过并行写入多个h5文件所节省的大部分时间。我有两个问题:

  1. 有没有更有效的方法来合并(追加)具有相同表格式的h5文件?(类似SQL的Union功能)。我尝试过这个,但没能成功追加表格。

  2. 如果没有,按行分割是否合理,尤其是当大多数查询都是针对所有列的选择时?我在考虑写一个map/combine函数,来查找表的所有部分以处理选择查询。Pandas的select_as_multiple()函数可以根据列进行分割。


根据Jeff的建议更新:

非常感谢你提到在合并前的文件写入过程中去掉索引和压缩。去掉索引和压缩后,将每个合并前文件的最大行数设置为100万行:

%timeit -n 1 write_multiple_hdf_multiprocess()
1 loops, best of 3: 9min 37s per loop

这比之前快了两分钟多,几乎是我能解析数据的最快速度。在将数据列设置为所需字段(在我这里是3个字段)后:

for f in filenames:
    s=pd.HDFStore(f)
    df=s.select('data')
    dc=df.columns[1:4]
    store.append(key='data',value=df,format='t',data_columns=dc)

这比之前慢了大约两分钟:1 loops, best of 3: 10min 23s per loop。在去掉上面代码中的压缩后,我得到了1 loops, best of 3: 8min 48s per loop(几乎和第一次尝试的结果相同,都是没有数据列索引的压缩)。为了让你了解压缩效果如何,未压缩的存储大约是13.5GB,而使用blosc压缩后的版本大约是3.7GB。

总的来说,我的处理过程需要18分钟15秒来创建一个合并的未压缩hdf5文件。与单个文件写入(压缩)相比,快了大约4分钟7秒

这让我想到了我的第二个问题,如果我不合并文件,而是使用合并前的文件以map/combine的方式处理,这样的做法是否合理?我应该如何考虑实现这个?

为了完全透明,我使用的Pandas版本是0.12.0,Pytables版本是3.0.0,我的数据处理工作流程如下(伪代码):

def generate_chunks_from_text_file(reader,chunksize=50000):
    """ generator that yields processed text chunks """

    for i, line in enumerate(reader.readlines()):
        ----process data and yield chunk -----


def data_reader(reader,queue):
    """ read data from file and put it into a queue for multiprocessing """

    for chunk in self.generate_chunks_from_text_file(reader):
        queue.put(chunk) # put data in the queue for the writer

def data_processor(queue,filename,dtype,min_size):
    """" subprocess that reads the next value in the queue and writes hdf store. """

    store=pd.HDFStore(filename)

    while True:

        results = queue.get()
        array=np.array(results,dtype=dt) # convert to numpy array
        df = pd.DataFrame(array) #covert to pandas array

        store.append(key='data', value=df, format='t', min_itemsize=dict(min_size), data_columns=[],index=False)
    store.close()
        ----when queue exhausts - break-----

1 个回答

7

我使用一种类似的方式,先分开处理再合并,利用多个进程来创建中间文件,然后用一个进程来合并这些文件。这里有一些提高性能的小建议:

  • 在写文件的时候,可以通过设置 index=False 来关闭索引,具体可以查看 这里 的文档。我认为 PyTables 会逐步更新索引,而在这种情况下完全没有必要(因为你之后会合并这些文件)。只对最终的文件建立索引。这样可以大大加快写入速度。

  • 你可以考虑根据你的查询需求,改变默认的索引方式或级别(假设你遵循下面几个建议,不要创建太多数据列)。

  • 同样的道理,在写预合并文件的时候,不要创建压缩文件,而是在写完索引文件后再创建(以未压缩的状态),这应该是你的最后一步。具体可以查看 这里 的文档。此外,使用 ptrepack 时,务必要传入 --chunkshape=auto,这样可以重新计算 PyTables 的块大小(比如一次读取或写入多少数据),它会考虑整个表格。

  • 关于压缩,效果因数据而异,取决于你的数据压缩效果如何,以及你进行的查询类型。我发现有些数据不压缩反而更快,尽管理论上压缩应该更好。你需要自己尝试(不过我总是使用 blosc)。Blosc 只有一个压缩级别(1-9 级别是开启的,0 级别是关闭的)。所以改变这个不会有任何效果。

  • 我按照索引顺序合并文件,基本上是先把一部分预合并文件读入内存(保持一个固定的数量,以使用固定的内存),然后一个一个地追加到最终文件中。(不太确定这样是否真的有区别,但似乎效果不错)。

  • 你会发现大部分时间都花在 创建 索引上。

  • 而且,只索引你实际需要的列!在写每个文件时,确保指定 data_columns=a_small_subset_of_columns

  • 我发现写很多小文件再合并成一个大文件效果更好,而不是写几个大文件,但这可能因人而异。(比如说写 100 个 100MB 的预合并文件,最后得到一个 10GB 的文件,而不是 5 个 2GB 的文件)。不过这可能与我的处理流程有关,因为我通常在处理上会遇到瓶颈,而不是实际的写入。

  • 我没有使用过,但听说使用 SSD(固态硬盘)效果非常好,即使它相对较小。在这种情况下,使用 SSD 可以大幅提升速度(压缩可能会改变这个结果)。

撰写回答