如何将大文件的数据分块以进行多进程处理?

19 投票
2 回答
15921 浏览
提问于 2025-04-17 09:28

我正在尝试使用多进程来加速一个应用程序,这个程序需要处理一个非常大的CSV文件(大小在64MB到500MB之间),逐行进行一些操作,然后输出一个小的、固定大小的文件。

目前,我使用了 list(file_obj),但这会把整个文件都加载到内存中(我想是这样),然后我把这个列表分成n个部分,n就是我想要运行的进程数量。接着,我对这些分好的列表使用 pool.map()

这样做的运行时间比起单线程的方式(就是直接打开文件然后逐行处理)要差得多。有人能给我推荐一个更好的解决方案吗?

另外,我需要按组处理文件中的行,这些组要保持某一列的值不变。这些行的组可以被拆分,但每组中不能有超过一个该列的值。

2 个回答

2

我建议保持简单。可以用一个程序打开文件,然后一行一行地读取内容。你可以决定把文件分成多少个部分,打开相应数量的输出文件,然后把每一行写入下一个文件。这样就能把文件分成n个相等的部分。接着,你可以同时对每个文件运行一个Python程序。

20

file_obj文件很大的时候,使用list(file_obj)会占用很多内存。我们可以通过使用itertools来按需提取一些行,从而减少内存的使用。

具体来说,我们可以使用

reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)

将文件分成可以处理的小块,然后

groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)]
result = pool.map(worker, groups)

让多进程池同时处理num_chunks个小块。

这样一来,我们只需要大约足够的内存来存放几个(num_chunks)小块,而不是整个文件。


import multiprocessing as mp
import itertools
import time
import csv

def worker(chunk):
    # `chunk` will be a list of CSV rows all with the same name column
    # replace this with your real computation
    # print(chunk)
    return len(chunk)  

def keyfunc(row):
    # `row` is one row of the CSV file.
    # replace this with the name column.
    return row[0]

def main():
    pool = mp.Pool()
    largefile = 'test.dat'
    num_chunks = 10
    results = []
    with open(largefile) as f:
        reader = csv.reader(f)
        chunks = itertools.groupby(reader, keyfunc)
        while True:
            # make a list of num_chunks chunks
            groups = [list(chunk) for key, chunk in
                      itertools.islice(chunks, num_chunks)]
            if groups:
                result = pool.map(worker, groups)
                results.extend(result)
            else:
                break
    pool.close()
    pool.join()
    print(results)

if __name__ == '__main__':
    main()

撰写回答