如何将Multiprocessing.Pool的结果流式写入csv?
我有一个用Python(2.7)写的程序,它接收一个关键字,进行一系列计算,然后返回一个结果列表。下面是一个非常简单的版本。
我使用多进程来创建线程,这样可以更快地处理数据。不过,我的生产数据有几百万行,每次循环的时间越来越长。上次运行时,每次循环花了超过6分钟,而一开始只需要一秒钟或更少。我觉得这是因为所有线程都在往结果集中添加结果,结果集不断增长,直到包含所有记录。
有没有办法使用多进程将每个线程的结果(一个列表)流式传输到CSV文件中,或者批量结果集中,这样在达到一定行数后就可以写入CSV文件?
如果有其他加速或优化的方法,也非常感谢分享。
import numpy as np
import pandas as pd
import csv
import os
import multiprocessing
from multiprocessing import Pool
global keys
keys = [1,2,3,4,5,6,7,8,9,10,11,12]
def key_loop(key):
test_df = pd.DataFrame(np.random.randn(1,4), columns=['a','b','c','d'])
test_list = test_df.ix[0].tolist()
return test_list
if __name__ == "__main__":
try:
pool = Pool(processes=8)
resultset = pool.imap(key_loop,(key for key in keys) )
loaddata = []
for sublist in resultset:
loaddata.append(sublist)
with open("C:\\Users\\mp_streaming_test.csv", 'w') as file:
writer = csv.writer(file)
for listitem in loaddata:
writer.writerow(listitem)
file.close
print "finished load"
except:
print 'There was a problem multithreading the key Pool'
raise
3 个回答
我觉得一次性处理大结构并使用追加的方式会让它变得很慢。我通常的做法是打开和处理器数量一样多的文件,然后用取余的方法立即写入每个文件。这样可以避免所有数据流都写到同一个文件里导致的错误,同时也不需要存储大量数据。可能这不是最好的解决方案,但确实很简单。最后你只需要把结果合并回来就行了。
在运行开始时定义:
num_cores = 8
file_sep = ","
outFiles = [open('out' + str(x) + ".csv", "a") for x in range(num_cores)]
然后在 key_loop 函数中:
def key_loop(key):
test_df = pd.DataFrame(np.random.randn(1,4), columns=['a','b','c','d'])
test_list = test_df.ix[0].tolist()
outFiles[key % num_cores].write(file_sep.join([str(x) for x in test_list])
+ "\n")
之后,别忘了关闭文件:[x.close() for x in outFiles]
改进建议:
像评论中提到的那样,按块处理数据。一次写/处理一行会比按块写要慢得多。
处理错误(关闭文件)
重要提示:我不太确定“keys”变量的意思,但那里的数字会导致取余无法确保每个进程都写入到各自的流中(比如12个keys,取余8会让2个进程写到同一个文件里)
下面这段非常简单的代码可以把很多工人的数据收集到一个CSV文件里。每个工人会接收一个键,然后返回一系列的数据行。父进程会同时处理多个键,使用多个工人。当每个键的处理完成后,父进程会把输出的数据行按顺序写入CSV文件。
要注意顺序。如果每个工人直接写入CSV文件,数据可能会乱七八糟,或者会互相覆盖。让每个工人写入自己的CSV文件虽然速度快,但最后还得把所有的数据文件合并在一起。
源代码
import csv, multiprocessing, sys
def worker(key):
return [ [key, 0], [key+1, 1] ]
pool = multiprocessing.Pool() # default 1 proc per CPU
writer = csv.writer(sys.stdout)
for resultset in pool.imap(worker, [1,2,3,4]):
for row in resultset:
writer.writerow(row)
输出结果
1,0
2,1
2,0
3,1
3,0
4,1
4,0
5,1
这里有一个答案,汇总了Eevee和我提出的建议。
import numpy as np
import pandas as pd
import csv
from multiprocessing import Pool
keys = [1,2,3,4,5,6,7,8,9,10,11,12]
def key_loop(key):
test_df = pd.DataFrame(np.random.randn(1,4), columns=['a','b','c','d'])
test_list = test_df.ix[0].tolist()
return test_list
if __name__ == "__main__":
try:
pool = Pool(processes=8)
resultset = pool.imap(key_loop, keys, chunksize=200)
with open("C:\\Users\\mp_streaming_test.csv", 'w') as file:
writer = csv.writer(file)
for listitem in resultset:
writer.writerow(listitem)
print "finished load"
except:
print 'There was a problem multithreading the key Pool'
raise
再次强调,这里有几个改动:
- 直接遍历
resultset
,而不是先把它复制到一个列表里,这样可以省去不必要的步骤。 - 直接把
keys
列表传给pool.imap
,而不是先把它变成生成器。 - 给
imap
提供一个比默认值1更大的chunksize
。更大的chunksize
可以减少在进程之间传递keys
中的值时所需的通信成本,这样在keys
很大的情况下(就像你这个例子一样)可以显著提高性能。你可以尝试不同的chunksize
值(比如试试比200大得多的值,比如5000等),看看对性能的影响。我猜200可能不够,但肯定比1要好。