如何将Multiprocessing.Pool的结果流式写入csv?

2 投票
3 回答
3010 浏览
提问于 2025-04-18 11:22

我有一个用Python(2.7)写的程序,它接收一个关键字,进行一系列计算,然后返回一个结果列表。下面是一个非常简单的版本。

我使用多进程来创建线程,这样可以更快地处理数据。不过,我的生产数据有几百万行,每次循环的时间越来越长。上次运行时,每次循环花了超过6分钟,而一开始只需要一秒钟或更少。我觉得这是因为所有线程都在往结果集中添加结果,结果集不断增长,直到包含所有记录。

有没有办法使用多进程将每个线程的结果(一个列表)流式传输到CSV文件中,或者批量结果集中,这样在达到一定行数后就可以写入CSV文件?

如果有其他加速或优化的方法,也非常感谢分享。

import numpy as np
import pandas as pd
import csv
import os
import multiprocessing
from multiprocessing import Pool

global keys
keys = [1,2,3,4,5,6,7,8,9,10,11,12]

def key_loop(key):
    test_df = pd.DataFrame(np.random.randn(1,4), columns=['a','b','c','d'])
    test_list = test_df.ix[0].tolist()
    return test_list

if __name__ == "__main__":
    try:
        pool = Pool(processes=8)      
        resultset = pool.imap(key_loop,(key for key in keys) )

        loaddata = []
        for sublist in resultset:
            loaddata.append(sublist)

        with open("C:\\Users\\mp_streaming_test.csv", 'w') as file:
            writer = csv.writer(file)
            for listitem in loaddata:
                writer.writerow(listitem)
        file.close

        print "finished load"
    except:
        print 'There was a problem multithreading the key Pool'
        raise

3 个回答

0

我觉得一次性处理大结构并使用追加的方式会让它变得很慢。我通常的做法是打开和处理器数量一样多的文件,然后用取余的方法立即写入每个文件。这样可以避免所有数据流都写到同一个文件里导致的错误,同时也不需要存储大量数据。可能这不是最好的解决方案,但确实很简单。最后你只需要把结果合并回来就行了。

在运行开始时定义:

num_cores = 8
file_sep = ","
outFiles = [open('out' + str(x) + ".csv", "a") for x in range(num_cores)]

然后在 key_loop 函数中:

def key_loop(key):
    test_df = pd.DataFrame(np.random.randn(1,4), columns=['a','b','c','d'])
    test_list = test_df.ix[0].tolist()
    outFiles[key % num_cores].write(file_sep.join([str(x) for x in test_list]) 
                                    + "\n")

之后,别忘了关闭文件:[x.close() for x in outFiles]

改进建议:

  • 像评论中提到的那样,按块处理数据。一次写/处理一行会比按块写要慢得多。

  • 处理错误(关闭文件)

  • 重要提示:我不太确定“keys”变量的意思,但那里的数字会导致取余无法确保每个进程都写入到各自的流中(比如12个keys,取余8会让2个进程写到同一个文件里)

2

下面这段非常简单的代码可以把很多工人的数据收集到一个CSV文件里。每个工人会接收一个键,然后返回一系列的数据行。父进程会同时处理多个键,使用多个工人。当每个键的处理完成后,父进程会把输出的数据行按顺序写入CSV文件。

要注意顺序。如果每个工人直接写入CSV文件,数据可能会乱七八糟,或者会互相覆盖。让每个工人写入自己的CSV文件虽然速度快,但最后还得把所有的数据文件合并在一起。

源代码

import csv, multiprocessing, sys

def worker(key):
    return [ [key, 0], [key+1, 1] ]


pool = multiprocessing.Pool()   # default 1 proc per CPU
writer = csv.writer(sys.stdout)

for resultset in pool.imap(worker, [1,2,3,4]):
    for row in resultset:
        writer.writerow(row)

输出结果

1,0
2,1
2,0
3,1
3,0
4,1
4,0
5,1
4

这里有一个答案,汇总了Eevee和我提出的建议。

import numpy as np
import pandas as pd
import csv
from multiprocessing import Pool

keys = [1,2,3,4,5,6,7,8,9,10,11,12]

def key_loop(key):
    test_df = pd.DataFrame(np.random.randn(1,4), columns=['a','b','c','d'])
    test_list = test_df.ix[0].tolist()
    return test_list

if __name__ == "__main__":
    try:
        pool = Pool(processes=8)      
        resultset = pool.imap(key_loop, keys, chunksize=200)

        with open("C:\\Users\\mp_streaming_test.csv", 'w') as file:
            writer = csv.writer(file)
            for listitem in resultset:
                writer.writerow(listitem)

        print "finished load"
    except:
        print 'There was a problem multithreading the key Pool'
        raise

再次强调,这里有几个改动:

  1. 直接遍历 resultset,而不是先把它复制到一个列表里,这样可以省去不必要的步骤。
  2. 直接把 keys 列表传给 pool.imap,而不是先把它变成生成器。
  3. imap 提供一个比默认值1更大的 chunksize。更大的 chunksize 可以减少在进程之间传递 keys 中的值时所需的通信成本,这样在 keys 很大的情况下(就像你这个例子一样)可以显著提高性能。你可以尝试不同的 chunksize 值(比如试试比200大得多的值,比如5000等),看看对性能的影响。我猜200可能不够,但肯定比1要好。

撰写回答