Python多进程池在join时挂起?

53 投票
3 回答
32912 浏览
提问于 2025-04-17 18:31

我正在尝试在多个文件上并行运行一些Python代码。基本的结构是这样的:

def process_file(filename, foo, bar, baz=biz):
    # do stuff that may fail and cause exception

if __name__ == '__main__':
    # setup code setting parameters foo, bar, and biz

    psize = multiprocessing.cpu_count()*2
    pool = multiprocessing.Pool(processes=psize)

    map(lambda x: pool.apply_async(process_file, (x, foo, bar), dict(baz=biz)), sys.argv[1:])
    pool.close()
    pool.join()

我之前用过pool.map来做类似的事情,效果很好,但在这里我似乎不能使用它,因为pool.map似乎不允许我传递额外的参数(而且用lambda函数也不行,因为lambda不能被序列化)。

所以现在我尝试直接使用apply_async()来解决这个问题。我的问题是,代码似乎卡住了,永远也不会退出。有几个文件出现了异常,但我不知道是什么原因导致join失败或卡住?有趣的是,如果没有文件出现异常,它就能正常退出。

我漏掉了什么呢?

补充:当函数(也就是一个工作进程)失败时,我看到这个异常:

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 376, in _handle_results
    task = get()
TypeError: ('__init__() takes at least 3 arguments (1 given)', <class 'subprocess.CalledProcessError'>, ())

如果我看到其中一个异常,父进程就会永远卡住,永远不会处理子进程并退出。

3 个回答

3

说实话,我遇到过一个类似的bug(虽然不是完全一样),当时是pool.map卡住了。的使用场景让我可以用pool.terminate来解决这个问题(在你修改任何东西之前,确保你的情况也能这样做)。

在调用terminate之前,我使用了pool.map,这样我就知道所有的任务都完成了,参考文档

这是一个与内置的map()函数相似的并行版本(不过它只支持一个可迭代的参数)。它会一直等到结果准备好。

如果你的情况也是这样,这可能是一个解决办法。

5

其实在某些情况下,如果你需要把一个对象保存起来(也就是“序列化”),你可以用 functools.partial 来代替 lambda。从 Python 2.7 开始,partial 对象就可以被保存了,Python 3 也是这样。

pool.map(functools.partial(process_file, x, foo, bar, baz=biz), sys.argv[1:])
58

抱歉我自己回答自己的问题,但我找到了一种解决方法,所以如果有人遇到类似的问题,我想在这里分享一下。我也欢迎更好的答案。

我认为问题的根源在于这个链接:http://bugs.python.org/issue9400。这让我明白了两件事:

  • 我不是疯了,我想做的事情确实应该能成功
  • 至少在python2中,把“异常”传回父进程是非常困难的,甚至可以说是不可能的。简单的异常可以,但很多其他的就不行。

在我的案例中,我的工作函数启动了一个子进程,但这个子进程出现了段错误(segfault)。这导致返回了一个CalledProcessError异常,而这个异常是无法被序列化的。出于某种原因,这让父进程中的池对象失去了响应,无法从join()调用中返回。

在我的具体情况下,我并不在乎这个异常是什么。最多我只想记录一下,然后继续进行。为此,我只需在我的顶层工作函数中加上一个try/except语句。如果工作函数抛出任何异常,它会在尝试返回父进程之前被捕获,记录下来,然后工作进程正常退出,因为它不再试图发送异常。具体代码如下:

def process_file_wrapped(filenamen, foo, bar, baz=biz):
    try:
        process_file(filename, foo, bar, baz=biz)
    except:
        print('%s: %s' % (filename, traceback.format_exc()))

然后,我在最开始的map函数调用中使用process_file_wrapped(),而不是原来的那个。现在我的代码按预期工作了。

撰写回答