获取通过multiprocessing.Pool.map启动的进程的退出代码

9 投票
1 回答
10011 浏览
提问于 2025-04-18 10:56

我正在使用Python的multiprocessing模块来并行处理一些计算量大的任务。最简单的选择就是使用一个Pool(工作池),然后用map方法来处理。

不过,进程有可能会出错。例如,它们可能会被系统的oom-killer悄悄终止。因此,我希望能够获取通过map启动的进程的退出代码。

另外,为了记录日志,我还希望能够知道每个值在可迭代对象中执行时所启动的进程ID(PID)。

1 个回答

10

如果你在使用 multiprocessing.Pool.map,通常你并不关心池中子进程的 退出代码,而是更关注它们完成工作后返回的值。这是因为在正常情况下,池中的进程不会在你 close/join 池之前退出,所以在所有工作完成之前,你是无法获取退出代码的,直到 Pool 快要被销毁时才会有。因此,没有公开的接口可以获取这些子进程的退出代码。

现在,如果你担心一些特殊情况,比如在子进程工作时被外部因素杀掉。如果遇到这种问题,你可能会发现一些奇怪的行为。实际上,在我的测试中,当我在 map 调用期间杀掉一个 Pool 中的进程时,map 从未完成,因为被杀掉的进程没有完成。不过,Python 会立即启动一个新进程来替代我杀掉的那个。

也就是说,你可以通过直接访问池中的 multiprocessing.Process 对象,使用私有的 _pool 属性来获取池中每个进程的进程 ID(pid):

pool = multiprocessing.Pool()
for proc in pool._pool:
  print proc.pid

所以,你可以尝试在调用 map_async 之前和之后检查池中进程的列表,以此来检测某个进程是否意外死亡(前提是你没有因为阻塞调用而卡住)。

before = pool._pool[:]  # Make a copy of the list of Process objects in our pool
result = pool.map_async(func, iterable)  # Use map_async so we don't get stuck.
while not result.ready():  # Wait for the call to complete
    if any(proc.exitcode for proc in before):  # Abort if one of our original processes is dead.
        print "One of our processes has exited. Something probably went horribly wrong."
        break
    result.wait(timeout=1)
else:  # We'll enter this block if we don't reach `break` above.
    print result.get() # Actually fetch the result list here.

我们需要复制这个列表,因为当 Pool 中的一个进程死亡时,Python 会立即用一个新进程替代它,并从列表中移除已死亡的进程。

在我的测试中,这种方法是有效的,但因为它依赖于 Pool 对象的私有属性(_pool),在生产代码中使用是有风险的。我还建议,过于担心这种情况可能有些多余,因为这种情况发生的可能性非常小,而且会显著增加实现的复杂性。

撰写回答