如果子进程导致段错误,multiprocessing.Pool会挂起
我想用 multiprocessing.Pool 来并行执行一个函数。问题是,如果某次函数调用导致了段错误,那个 Pool 就会一直卡在那里,无法继续。有没有人知道怎么做才能让这个 Pool 能够检测到这种情况,并且抛出一个错误呢?
下面的例子展示了如何重现这个问题(需要 scikit-learn 版本大于 0.14)
import numpy as np
from sklearn.ensemble import gradient_boosting
import time
from multiprocessing import Pool
class Bad(object):
tree_ = None
def fit_one(i):
if i == 3:
# this will segfault
bad = np.array([[Bad()] * 2], dtype=np.object)
gradient_boosting.predict_stages(bad,
np.random.rand(20, 2).astype(np.float32),
1.0, np.random.rand(20, 2))
else:
time.sleep(1)
return i
pool = Pool(2)
out = pool.imap_unordered(fit_one, range(10))
# we will never see 3
for o in out:
print o
4 个回答
我没有运行你的例子来看看它是否能处理错误,但你可以试试并发未来(concurrent futures)。只需把我的函数my_function(i)换成你的fit_one(i)。保持__name__=='__main__':
这个结构。并发未来似乎需要这个。下面的代码在我的电脑上测试过,所以希望在你的电脑上也能直接运行。
import concurrent.futures
def my_function(i):
print('function running')
return i
def run():
number_processes=4
executor = concurrent.futures.ProcessPoolExecutor(number_processes)
futures = [executor.submit(my_function,i) for i in range(10)]
concurrent.futures.wait(futures)
for f in futures:
print(f.result())
if __name__ == '__main__':
run()
与其使用 Pool().imap()
,你可能更想自己手动创建子进程,使用 Process()
。我敢打赌,这样返回的对象可以让你知道任何子进程的运行状态。这样你就能知道它们是否卡住了。
正如评论中所提到的,如果你使用 concurrent.Futures.ProcessPoolExecutor
而不是 multiprocessing.Pool
,那么在 Python 3 中这个方法就能正常工作。
如果你还在用 Python 2,我找到的最佳选择是使用 Pool.apply_async
和 Pool.map_async
返回的结果对象上的 timeout
参数。比如:
pool = Pool(2)
out = pool.map_async(fit_one, range(10))
for o in out:
print o.get(timeout=1000) # allow 1000 seconds max
只要你能确定一个子进程完成任务所需的最长时间,这个方法就能正常运作。
这是Python中的一个已知问题,编号22393。只要你在使用multiprocessing.pool
,就没有什么有效的解决办法,直到这个问题被修复。虽然在那个链接上有一个补丁可以解决这个问题,但它还没有被合并到正式版本中,所以目前没有稳定的Python版本能修复这个问题。