Python 多进程提前终止
在我的脚本运行的时候,有可能会出现错误。如果发生错误,所有的进程都应该被正确地终止,返回一个错误信息,然后脚本应该退出。
我现在的代码似乎还没有做到这一点。当出现错误时,错误信息会被发送到 report_error()
,但脚本在终端中会卡住,而且活动监视器显示还有很多Python进程在运行。
环境信息
- Mac OS X 10.8.5
- Python 3.3.3
那么,如何才能在脚本的任何地方正确地终止所有进程呢?
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import sys
from multiprocessing import Pool
# Global variables.
input_files = [
'test_data_0.csv',
'test_data_1.csv'
]
def report_error(error):
# Reports errors then exits script.
print("Error: {0}".format(error), file=sys.stderr)
sys.exit(1)
# What I really want is to report the error, properly terminate all processes,
# and then exit the script.
def read_file(file):
try:
# Read file into list.
except Exception as error:
report_error(error)
def check_file(file):
# Do some error checking on file.
if error:
report_error(error)
def job(file):
# Executed on each item in input_files.
check_file(file)
read_file(file)
def main():
# Sets up a process pool. Defaults to number of cores.
# Each input gets passed to job and processed in a separate process.
p = Pool()
p.map(job, input_files)
# Closing and joining a pool is important to ensure all resources are freed properly.
p.close()
p.join()
if __name__ == '__main__':
main()
1 个回答
10
首先,使用 sys.exit()
来结束子进程会导致整个进程池出问题,这样 map
命令就会一直卡在那里。现在的 multiprocessing
模块在子进程处理任务时,如果发生崩溃,无法正确恢复(这里有个关于这个问题的bug报告和修复补丁,链接在这里)。
其实,有几种方法可以实现你想要的功能。看起来你并不关心子进程返回的值,最简单的方法是用 imap_unordered
替代 map
,在子进程发生错误时抛出异常,然后简单地遍历 imap_unordered
返回的迭代器:
def report_error(error):
# Reports errors then exits script.
print("Error: {0}".format(error), file=sys.stderr)
raise error # Raise the exception
...
def main():
p = Pool()
try:
list(p.imap_unordered(job, input_files))
except Exception:
print("a worker failed, aborting...")
p.close()
p.terminate()
else:
p.close()
p.join()
if __name__ == '__main__':
main()
使用 imap_unordered
时,子进程一有结果就会立刻返回给父进程。所以,如果子进程返回了一个异常,父进程会立刻捕获到这个异常。我们可以捕获这个异常,打印一条消息,然后结束进程池。