多进程池中抛出的异常未被检测到

92 投票
9 回答
82514 浏览
提问于 2025-04-16 21:44

看起来,当从一个多进程的池子(multiprocessing.Pool)中抛出异常时,程序不会显示任何错误信息或者失败的迹象。举个例子:

from multiprocessing import Pool 

def go():
    print(1)
    raise Exception()
    print(2)

p = Pool()
p.apply_async(go)
p.close()
p.join()

这个代码会打印出1,然后悄悄地停止了。有趣的是,如果抛出一个BaseException(基础异常),就能正常工作。那么,有没有办法让所有的异常都像BaseException那样处理呢?

9 个回答

26

目前投票最多的解决方案存在一个问题:

from multiprocessing import Pool

def go():
    print(1)
    raise Exception("foobar")
    print(2)

p = Pool()
x = p.apply_async(go)
x.get()  ## waiting here for go() to complete...
p.close()
p.join()

正如 @dfrankow 提到的,它会在 x.get() 这一步卡住,这样就失去了异步运行任务的意义。所以,为了提高效率(特别是当你的工作函数 go 运行时间很长时),我会把它改成:

from multiprocessing import Pool

def go(x):
    print(1)
    # task_that_takes_a_long_time()
    raise Exception("Can't go anywhere.")
    print(2)
    return x**2

p = Pool()
results = []
for x in range(1000):
    results.append( p.apply_async(go, [x]) )

p.close()

for r in results:
     r.get()

优点:工作函数是异步运行的,比如说如果你在多个核心上同时运行很多任务,这样会比原来的方案高效得多。

缺点:如果工作函数里面出现了异常,它只会在所有任务完成后才会被抛出。这种行为可能是你想要的,也可能不是。 根据 @colinfang 的评论进行了编辑,修复了这个问题。

65

也许我理解错了,但这不就是Result对象的get方法返回的内容吗?可以参考一下进程池的文档。

class multiprocessing.pool.AsyncResult

这是通过Pool.apply_async()和Pool.map_async().get([timeout])返回的结果类。
当结果到达时返回。如果设置了超时时间(timeout)且结果在指定的秒数内没有到达,就会抛出multiprocessing.TimeoutError。如果远程调用抛出了异常,get()方法会重新抛出这个异常。

所以,稍微修改一下你的例子,可以这样做:

from multiprocessing import Pool

def go():
    print(1)
    raise Exception("foobar")
    print(2)

p = Pool()
x = p.apply_async(go)
x.get()
p.close()
p.join()

这样得到的结果是:

1
Traceback (most recent call last):
  File "rob.py", line 10, in <module>
    x.get()
  File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get
    raise self._value
Exception: foobar

这并不是完全令人满意,因为它没有打印出错误追踪信息,但总比什么都没有要好。

更新:这个bug在Python 3.4中已经修复,感谢Richard Oudkerk。可以查看这个问题get方法应该返回完整的错误追踪信息

32

我有一个不错的解决办法,至少在调试的时候能用。目前我还没有办法把异常抛回主进程。最开始我想用装饰器,但你只能对模块顶层定义的函数进行序列化,所以这个想法就不行了。

于是,我想出了一个简单的包装类和一个池的子类,用这个来处理 apply_async(因此也包括 apply)。至于 map_async,就留给读者自己去研究了。

import traceback
from multiprocessing.pool import Pool
import multiprocessing

# Shortcut to multiprocessing's logger
def error(msg, *args):
    return multiprocessing.get_logger().error(msg, *args)

class LogExceptions(object):
    def __init__(self, callable):
        self.__callable = callable

    def __call__(self, *args, **kwargs):
        try:
            result = self.__callable(*args, **kwargs)

        except Exception as e:
            # Here we add some debugging help. If multiprocessing's
            # debugging is on, it will arrange to log the traceback
            error(traceback.format_exc())
            # Re-raise the original exception so the Pool worker can
            # clean up
            raise

        # It was fine, give a normal answer
        return result

class LoggingPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        return Pool.apply_async(self, LogExceptions(func), args, kwds, callback)

def go():
    print(1)
    raise Exception()
    print(2)

multiprocessing.log_to_stderr()
p = LoggingPool(processes=1)

p.apply_async(go)
p.close()
p.join()

这样我就得到了:

1
[ERROR/PoolWorker-1] Traceback (most recent call last):
  File "mpdebug.py", line 24, in __call__
    result = self.__callable(*args, **kwargs)
  File "mpdebug.py", line 44, in go
    raise Exception()
Exception

撰写回答