使用多进程工作池
我写了以下代码来让我的第二个CPU核心发挥作用。这个代码的基本功能是先在文件夹中找到想要的“sea”文件,然后执行一系列外部脚本来处理这些二进制的“sea”文件,最终生成50到100个文本和二进制文件。正如问题标题所说,我希望能并行处理,以提高处理速度。
这个问题源于我们在IPython用户讨论组中进行的长时间讨论,标题是“无法启动ipcluster”。我开始尝试IPython的并行处理功能。
问题是我无法正确运行这段代码。如果包含“sea”文件的文件夹里只放有“sea”文件,脚本执行完后并没有完全运行所有的外部脚本。(比如我有30到50个外部脚本要运行,但我的多进程脚本只在执行完第一个外部脚本后就结束了。)有趣的是,如果我在一个已经处理过的文件夹中运行这个脚本(也就是说,这个文件夹里的“sea”文件已经处理过,输出文件也已经存在),那么它就能运行,这次的处理速度比线性处理快了大约2.4到2.7倍。这个结果让我有些意外,因为我笔记本里的CPU只是一个2.5 GHz的Core 2 Duo。虽然我有一个支持CUDA的GPU,但这和我目前的并行计算问题没有关系 :)
你觉得这个问题的原因可能是什么呢?
感谢大家的评论和建议。
#!/usr/bin/env python
from multiprocessing import Pool
from subprocess import call
import os
def find_sea_files():
file_list, path_list = [], []
init = os.getcwd()
for root, dirs, files in os.walk('.'):
dirs.sort()
for file in files:
if file.endswith('.sea'):
file_list.append(file)
os.chdir(root)
path_list.append(os.getcwd())
os.chdir(init)
return file_list, path_list
def process_all(pf):
os.chdir(pf[0])
call(['postprocessing_saudi', pf[1]])
if __name__ == '__main__':
pool = Pool(processes=2) # start 2 worker processes
files, paths = find_sea_files()
pathfile = [[paths[i],files[i]] for i in range(len(files))]
pool.map(process_all, pathfile)
2 个回答
我能想到几个问题:
1) 你有没有打印出路径文件?你确定它们都生成得正确吗?
a) 我这么问是因为你的 os.walk 用法有点特别;dirs.sort() 应该没问题,但看起来有点多余。一般来说,不应该使用 os.chdir();虽然恢复操作应该没问题,但你最好直接把根目录加到初始化里。
2) 我见过在 python2.6 中使用多进程时,从进程池中生成子进程会有问题。(我有个脚本就是用多进程来生成子进程,但那些子进程无法正确使用多进程,导致进程池卡住了。)你可以试试 python2.5 加上多进程的回退版本。
3) 试试 picloud 的 cloud.mp 模块(它封装了多进程,但处理进程池的方式稍有不同),看看能否解决问题。
你可以这样做:
cloud.mp.join(cloud.mp.map(process_all, pathfile))
(免责声明:我是 PiCloud 的开发者之一)
我建议先多了解一下工作进程的情况。Python的多进程模块自带了日志功能,可以用来记录子进程的信息。如果你已经简化了代码来找出问题所在,可以试着加几个打印语句来调试,比如这样(或者你可以把pf数组以更好看的方式打印出来):
def process_all(pf):
print "PID: ", os.getpid()
print "Script Dir: ", pf[0]
print "Script: ", pf[1]
os.chdir(pf[0])
call(['postprocessing_saudi', pf[1]])
if __name__ == '__main__':
pool = Pool(processes=2)
files, paths = find_sea_files()
pathfile = [[paths[i],files[i]] for i in range(len(files))]
pool.map(process_all, pathfile, 1) # Ensure the chunk size is 1
pool.close()
pool.join()
我用的Python版本是2.6.4。