使用多进程工作池

10 投票
2 回答
12169 浏览
提问于 2025-04-15 15:10

我写了以下代码来让我的第二个CPU核心发挥作用。这个代码的基本功能是先在文件夹中找到想要的“sea”文件,然后执行一系列外部脚本来处理这些二进制的“sea”文件,最终生成50到100个文本和二进制文件。正如问题标题所说,我希望能并行处理,以提高处理速度。

这个问题源于我们在IPython用户讨论组中进行的长时间讨论,标题是“无法启动ipcluster”。我开始尝试IPython的并行处理功能。

问题是我无法正确运行这段代码。如果包含“sea”文件的文件夹里只放有“sea”文件,脚本执行完后并没有完全运行所有的外部脚本。(比如我有30到50个外部脚本要运行,但我的多进程脚本只在执行完第一个外部脚本后就结束了。)有趣的是,如果我在一个已经处理过的文件夹中运行这个脚本(也就是说,这个文件夹里的“sea”文件已经处理过,输出文件也已经存在),那么它就能运行,这次的处理速度比线性处理快了大约2.4到2.7倍。这个结果让我有些意外,因为我笔记本里的CPU只是一个2.5 GHz的Core 2 Duo。虽然我有一个支持CUDA的GPU,但这和我目前的并行计算问题没有关系 :)

你觉得这个问题的原因可能是什么呢?

感谢大家的评论和建议。

#!/usr/bin/env python

from multiprocessing import Pool
from subprocess import call
import os


def find_sea_files():

   file_list, path_list = [], []
   init = os.getcwd()

   for root, dirs, files in os.walk('.'):
      dirs.sort()
      for file in files:
          if file.endswith('.sea'):
              file_list.append(file)
              os.chdir(root)
              path_list.append(os.getcwd())
              os.chdir(init)

   return file_list, path_list


def process_all(pf):
   os.chdir(pf[0])
   call(['postprocessing_saudi', pf[1]])


if __name__ == '__main__':
   pool = Pool(processes=2)              # start 2 worker processes
   files, paths = find_sea_files()
   pathfile = [[paths[i],files[i]] for i in range(len(files))]
   pool.map(process_all, pathfile)

2 个回答

3

我能想到几个问题:

1) 你有没有打印出路径文件?你确定它们都生成得正确吗?

a) 我这么问是因为你的 os.walk 用法有点特别;dirs.sort() 应该没问题,但看起来有点多余。一般来说,不应该使用 os.chdir();虽然恢复操作应该没问题,但你最好直接把根目录加到初始化里。

2) 我见过在 python2.6 中使用多进程时,从进程池中生成子进程会有问题。(我有个脚本就是用多进程来生成子进程,但那些子进程无法正确使用多进程,导致进程池卡住了。)你可以试试 python2.5 加上多进程的回退版本。

3) 试试 picloud 的 cloud.mp 模块(它封装了多进程,但处理进程池的方式稍有不同),看看能否解决问题。

你可以这样做:

cloud.mp.join(cloud.mp.map(process_all, pathfile))

(免责声明:我是 PiCloud 的开发者之一)

6

我建议先多了解一下工作进程的情况。Python的多进程模块自带了日志功能,可以用来记录子进程的信息。如果你已经简化了代码来找出问题所在,可以试着加几个打印语句来调试,比如这样(或者你可以把pf数组以更好看的方式打印出来):


def process_all(pf):
   print "PID: ", os.getpid()
   print "Script Dir: ", pf[0]
   print "Script: ", pf[1]
   os.chdir(pf[0])
   call(['postprocessing_saudi', pf[1]])


if __name__ == '__main__':
   pool = Pool(processes=2)
   files, paths = find_sea_files()
   pathfile = [[paths[i],files[i]] for i in range(len(files))]
   pool.map(process_all, pathfile, 1) # Ensure the chunk size is 1
   pool.close()
   pool.join()

我用的Python版本是2.6.4。

撰写回答