Python 多进程提前终止

5 投票
1 回答
5394 浏览
提问于 2025-04-21 02:52

在我的脚本运行的时候,有可能会出现错误。如果发生错误,所有的进程都应该被正确地终止,返回一个错误信息,然后脚本应该退出。

我现在的代码似乎还没有做到这一点。当出现错误时,错误信息会被发送到 report_error(),但脚本在终端中会卡住,而且活动监视器显示还有很多Python进程在运行。

环境信息

  • Mac OS X 10.8.5
  • Python 3.3.3

那么,如何才能在脚本的任何地方正确地终止所有进程呢?

#!/usr/bin/env python3
# -*- coding: utf-8 -*-


import sys
from multiprocessing import Pool


# Global variables.

input_files = [
    'test_data_0.csv',
    'test_data_1.csv'
]


def report_error(error):

    # Reports errors then exits script.
    print("Error: {0}".format(error), file=sys.stderr)
    sys.exit(1)

    # What I really want is to report the error, properly terminate all processes,
    # and then exit the script.


def read_file(file):

    try:
        # Read file into list.
    except Exception as error:
        report_error(error)


def check_file(file):

    # Do some error checking on file.
    if error:
        report_error(error)


def job(file):

    # Executed on each item in input_files.

    check_file(file)
    read_file(file)


def main():

    # Sets up a process pool. Defaults to number of cores.
    # Each input gets passed to job and processed in a separate process.
    p = Pool()
    p.map(job, input_files)

    # Closing and joining a pool is important to ensure all resources are freed properly.
    p.close()
    p.join()


if __name__ == '__main__':
    main()

1 个回答

10

首先,使用 sys.exit() 来结束子进程会导致整个进程池出问题,这样 map 命令就会一直卡在那里。现在的 multiprocessing 模块在子进程处理任务时,如果发生崩溃,无法正确恢复(这里有个关于这个问题的bug报告和修复补丁,链接在这里)。

其实,有几种方法可以实现你想要的功能。看起来你并不关心子进程返回的值,最简单的方法是用 imap_unordered 替代 map,在子进程发生错误时抛出异常,然后简单地遍历 imap_unordered 返回的迭代器:

def report_error(error):

    # Reports errors then exits script.
    print("Error: {0}".format(error), file=sys.stderr)
    raise error # Raise the exception

...

def main():
    p = Pool()
    try:
        list(p.imap_unordered(job, input_files))
    except Exception:
        print("a worker failed, aborting...")
        p.close()
        p.terminate()
    else:
        p.close()
        p.join()

if __name__ == '__main__':
    main()

使用 imap_unordered 时,子进程一有结果就会立刻返回给父进程。所以,如果子进程返回了一个异常,父进程会立刻捕获到这个异常。我们可以捕获这个异常,打印一条消息,然后结束进程池。

撰写回答