multiprocessing.Pool的apply_async中,工人如何捕获错误并继续?
在使用 multiprocessing.Pool 的 apply_async()
时,代码中的中断会发生什么呢?我觉得这主要是指异常情况,但可能还有其他原因导致工作函数失败。
import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())
for f in files:
pool.apply_async(workerfunct, args=(*args), callback=callbackfunct)
根据我目前的理解,当一个进程或工作函数失败时(其他进程还在继续运行),抛出的错误之后的代码不会被执行,即使我用 try/except 捕获了这个错误。
举个例子,通常我会捕获错误,然后设置一个默认值或者打印出错误信息,这样代码就可以继续运行。如果我的回调函数需要写入文件,那就会用默认值来写。
我怀疑你在示例代码中没有看到任何结果的原因是因为你所有的工作函数调用都失败了。如果一个工作函数失败了,回调函数就不会被执行。除非你尝试从 apply_async 返回的 AsyncResult 对象中获取结果,否则这个失败不会被报告。不过,由于你没有保存这些对象,你就永远不知道失败发生了。如果我是你,我会在测试时使用 pool.apply,这样你可以在错误发生时立即看到。
1 个回答
8
如果你在使用 Python 3.2 及以上版本,可以利用 error_callback
这个参数来处理工作进程中出现的错误。
pool.apply_async(workerfunct, args=(*args), callback=callbackfunct, error_callback=handle_error)
handle_error
函数会接收到一个错误对象作为参数。
如果你使用的版本低于这个,那你就得把所有的工作函数放在 try
/except
语句中,这样才能确保你的 callback
会被执行。(我想你可能从我在另一个问题中的回答中得到了这个不会工作的印象,但其实并不是这样。抱歉!):
def workerfunct(*args):
try:
# Stuff
except Exception as e:
# Do something here, maybe return e?
pool.apply_async(workerfunct, args=(*args), callback=callbackfunct)
如果你不能或者不想修改你实际想调用的函数,你也可以使用一个包装函数:
def wrapper(func, *args):
try:
return func(*args)
except Exception as e:
return e
pool.apply_async(wrapper, args=(workerfunct, *args), callback=callbackfunct)