Python: multiprocessing.map: 如果一个进程引发异常,为什么其他进程的finally块不被调用?

30 投票
3 回答
19806 浏览
提问于 2025-04-17 03:57

我的理解是,只要进入了try块,finally部分就一定会被执行。

import random

from multiprocessing import Pool
from time import sleep

def Process(x):
  try:
    print x
    sleep(random.random())
    raise Exception('Exception: ' + x)
  finally:
    print 'Finally: ' + x

Pool(3).map(Process, ['1','2','3'])

期望的输出是,对于每一个在第8行单独打印的x,一定会有一次'Finally x'的出现。

示例输出:

$ python bug.py 
1
2
3
Finally: 2
Traceback (most recent call last):
  File "bug.py", line 14, in <module>
    Pool(3).map(Process, ['1','2','3'])
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 225, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 522, in get
    raise self._value
Exception: Exception: 2

看起来一个进程中的异常会终止父进程和兄弟进程,即使其他进程中还有需要完成的工作。

我哪里理解错了?为什么这样说是对的?如果这是对的,应该如何在多进程的Python中安全地清理资源呢?

3 个回答

1

finally 会重新抛出原来的错误,除非你在里面使用了 return。这样的话,错误就会被 Pool.map 抛出,导致你的整个程序崩溃。所有的子进程都会被终止,你也看不到其他的错误信息。

你可以加一个 return 来处理这个错误:

def Process(x):
  try:
    print x
    sleep(random.random())
    raise Exception('Exception: ' + x)
  finally:
    print 'Finally: ' + x
    return

这样的话,当出现错误时,你的 map 结果里就会是 None

5

来自 unutbu 的回答确实解释了你观察到的行为的 原因。不过需要强调的是,SIGTERM 这个信号之所以会被发送,是因为 multiprocessing.pool._terminate_pool 的实现方式。如果你能避免使用 Pool,那么就能得到你想要的行为。这里有一个 借来的例子

from multiprocessing import Process
from time import sleep
import random

def f(x):
    try:
        sleep(random.random()*10)
        raise Exception
    except:
        print "Caught exception in process:", x
        # Make this last longer than the except clause in main.
        sleep(3)
    finally:
        print "Cleaning up process:", x

if __name__ == '__main__':
    processes = []
    for i in range(4):
        p = Process(target=f, args=(i,))
        p.start()
        processes.append(p)
    try:
        for process in processes:
            process.join()
    except:
        print "Caught exception in main."
    finally:
        print "Cleaning up main."

在发送 SIGINT 后,示例输出是:

Caught exception in process: 0
^C
Cleaning up process: 0
Caught exception in main.
Cleaning up main.
Caught exception in process: 1
Caught exception in process: 2
Caught exception in process: 3
Cleaning up process: 1
Cleaning up process: 2
Cleaning up process: 3

注意,finally 这个部分会在所有进程中执行。如果你需要共享内存,可以考虑使用 QueuePipeManager,或者一些外部存储,比如 redissqlite3

36

简短回答:SIGTERM 优先于 finally

详细回答:可以通过 mp.log_to_stderr() 来开启日志记录:

import random
import multiprocessing as mp
import time
import logging

logger=mp.log_to_stderr(logging.DEBUG)

def Process(x):
    try:
        logger.info(x)
        time.sleep(random.random())
        raise Exception('Exception: ' + x)
    finally:
        logger.info('Finally: ' + x)

result=mp.Pool(3).map(Process, ['1','2','3'])

日志输出包括:

[DEBUG/MainProcess] terminating workers

这对应于 multiprocessing.pool._terminate_pool 中的这段代码:

    if pool and hasattr(pool[0], 'terminate'):
        debug('terminating workers')
        for p in pool:
            p.terminate()

pool 中的每个 p 都是一个 multiprocessing.Process,调用 terminate(至少在非Windows机器上)会发送 SIGTERM 信号:

来自 multiprocessing/forking.py

class Popen(object)
    def terminate(self):
        ...
            try:
                os.kill(self.pid, signal.SIGTERM)
            except OSError, e:
                if self.wait(timeout=0.1) is None:
                    raise

所以关键在于,当一个 Python 进程在 try 块中接收到 SIGTERM 时会发生什么。

考虑以下示例(test.py):

import time    
def worker():
    try:
        time.sleep(100)        
    finally:
        print('enter finally')
        time.sleep(2) 
        print('exit finally')    
worker()

如果你运行这个程序,然后发送一个 SIGTERM,那么进程会立即结束,而不会进入 finally 块,这可以通过没有输出和没有延迟来证明。

在一个终端中:

% test.py

在第二个终端中:

% pkill -TERM -f "test.py"

第一个终端的结果:

Terminated

对比一下,当进程接收到 SIGINTC-c)时会发生什么:

在第二个终端中:

% pkill -INT -f "test.py"

第一个终端的结果:

enter finally
exit finally
Traceback (most recent call last):
  File "/home/unutbu/pybin/test.py", line 14, in <module>
    worker()
  File "/home/unutbu/pybin/test.py", line 8, in worker
    time.sleep(100)        
KeyboardInterrupt

结论:SIGTERM 优先于 finally

撰写回答