获取通过multiprocessing.Pool.map启动的进程的退出代码
我正在使用Python的multiprocessing
模块来并行处理一些计算量大的任务。最简单的选择就是使用一个Pool
(工作池),然后用map
方法来处理。
不过,进程有可能会出错。例如,它们可能会被系统的oom-killer
悄悄终止。因此,我希望能够获取通过map
启动的进程的退出代码。
另外,为了记录日志,我还希望能够知道每个值在可迭代对象中执行时所启动的进程ID(PID)。
1 个回答
如果你在使用 multiprocessing.Pool.map
,通常你并不关心池中子进程的 退出代码,而是更关注它们完成工作后返回的值。这是因为在正常情况下,池中的进程不会在你 close
/join
池之前退出,所以在所有工作完成之前,你是无法获取退出代码的,直到 Pool
快要被销毁时才会有。因此,没有公开的接口可以获取这些子进程的退出代码。
现在,如果你担心一些特殊情况,比如在子进程工作时被外部因素杀掉。如果遇到这种问题,你可能会发现一些奇怪的行为。实际上,在我的测试中,当我在 map
调用期间杀掉一个 Pool
中的进程时,map
从未完成,因为被杀掉的进程没有完成。不过,Python 会立即启动一个新进程来替代我杀掉的那个。
也就是说,你可以通过直接访问池中的 multiprocessing.Process
对象,使用私有的 _pool
属性来获取池中每个进程的进程 ID(pid):
pool = multiprocessing.Pool()
for proc in pool._pool:
print proc.pid
所以,你可以尝试在调用 map_async
之前和之后检查池中进程的列表,以此来检测某个进程是否意外死亡(前提是你没有因为阻塞调用而卡住)。
before = pool._pool[:] # Make a copy of the list of Process objects in our pool
result = pool.map_async(func, iterable) # Use map_async so we don't get stuck.
while not result.ready(): # Wait for the call to complete
if any(proc.exitcode for proc in before): # Abort if one of our original processes is dead.
print "One of our processes has exited. Something probably went horribly wrong."
break
result.wait(timeout=1)
else: # We'll enter this block if we don't reach `break` above.
print result.get() # Actually fetch the result list here.
我们需要复制这个列表,因为当 Pool
中的一个进程死亡时,Python 会立即用一个新进程替代它,并从列表中移除已死亡的进程。
在我的测试中,这种方法是有效的,但因为它依赖于 Pool
对象的私有属性(_pool
),在生产代码中使用是有风险的。我还建议,过于担心这种情况可能有些多余,因为这种情况发生的可能性非常小,而且会显著增加实现的复杂性。