Python多进程进程无声崩溃
我正在使用 Python 2.7.3。我通过子类化 multiprocessing.Process
对象来实现代码的并行处理。如果我在子类化的 Process 对象中没有错误,所有的代码都能正常运行。但是如果在这些对象的代码中出现错误,它们会悄无声息地崩溃(父进程的控制台不会显示任何错误信息),而且 CPU 使用率会降到零。父进程本身不会崩溃,这让人觉得程序似乎在卡住。与此同时,找出代码中的错误非常困难,因为没有任何提示告诉我错误出在哪里。
我在 StackOverflow 上找不到其他人问过类似的问题。
我猜子类化的 Process 对象之所以悄悄崩溃,是因为它们无法将错误信息打印到父进程的控制台,但我想知道我可以做些什么,以便至少能更有效地调试(这样其他使用我代码的人也能在遇到问题时告诉我)。
编辑:我的实际代码太复杂了,但一个简单的子类化 Process 对象的例子,里面有错误,可能是这样的:
from multiprocessing import Process, Queue
class Worker(Process):
def __init__(self, inputQueue, outputQueue):
super(Worker, self).__init__()
self.inputQueue = inputQueue
self.outputQueue = outputQueue
def run(self):
for i in iter(self.inputQueue.get, 'STOP'):
# (code that does stuff)
1 / 0 # Dumb error
# (more code that does stuff)
self.outputQueue.put(result)
4 个回答
这不是一个答案,只是一个扩展的评论。请运行这个程序,然后告诉我们你得到了什么输出(如果有的话):
from multiprocessing import Process, Queue
class Worker(Process):
def __init__(self, inputQueue, outputQueue):
super(Worker, self).__init__()
self.inputQueue = inputQueue
self.outputQueue = outputQueue
def run(self):
for i in iter(self.inputQueue.get, 'STOP'):
# (code that does stuff)
1 / 0 # Dumb error
# (more code that does stuff)
self.outputQueue.put(result)
if __name__ == '__main__':
inq, outq = Queue(), Queue()
inq.put(1)
inq.put('STOP')
w = Worker(inq, outq)
w.start()
我得到的是:
% test.py
Process Worker-1:
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/home/unutbu/pybin/test.py", line 21, in run
1 / 0 # Dumb error
ZeroDivisionError: integer division or modulo by zero
如果你什么都没得到,我会很惊讶。
我建议一种方法来显示程序运行中的异常情况。
from multiprocessing import Process
import traceback
run_old = Process.run
def run_new(*args, **kwargs):
try:
run_old(*args, **kwargs)
except (KeyboardInterrupt, SystemExit):
raise
except:
traceback.print_exc(file=sys.stdout)
Process.run = run_new
你真正想要的是一种方法,把异常信息传递给父进程,对吧?这样你就可以按照自己的方式处理它们。
如果你使用 concurrent.futures.ProcessPoolExecutor
,这个过程是自动的。如果你使用 multiprocessing.Pool
,这也很简单。如果你使用显式的 Process
和 Queue
,那就需要做一些额外的工作,但其实也没那么复杂。
举个例子:
def run(self):
try:
for i in iter(self.inputQueue.get, 'STOP'):
# (code that does stuff)
1 / 0 # Dumb error
# (more code that does stuff)
self.outputQueue.put(result)
except Exception as e:
self.outputQueue.put(e)
然后,你的调用代码可以像处理其他任何东西一样,从队列中读取 Exception
。而不是这样:
yield outq.pop()
而是这样:
result = outq.pop()
if isinstance(result, Exception):
raise result
yield result
(我不知道你实际的父进程读取队列的代码是怎么做的,因为你给的最小示例只是忽略了队列。但希望这能解释这个想法,尽管你的真实代码实际上并不是这样工作的。)
这里假设你想在任何未处理的异常发生时终止程序。如果你想把异常传回去并继续处理下一个 i in iter
,只需把 try
移到 for
循环里面,而不是把它包裹在外面。
这也假设 Exception
不是有效的值。如果这是个问题,最简单的解决办法就是推送 (result, exception)
的元组:
def run(self):
try:
for i in iter(self.inputQueue.get, 'STOP'):
# (code that does stuff)
1 / 0 # Dumb error
# (more code that does stuff)
self.outputQueue.put((result, None))
except Exception as e:
self.outputQueue.put((None, e))
然后,你的弹出代码可以这样做:
result, exception = outq.pop()
if exception:
raise exception
yield result
你可能会注意到,这和 node.js 的回调风格很相似,你需要把 (err, result)
传递给每个回调。是的,这很烦人,而且你会搞乱这种风格的代码。但实际上你并没有在其他地方使用这种方式,除了在包装器里;你所有的“应用级”代码在从队列获取值或在 run
内部调用时,看到的都是正常的返回值或抛出的异常。
你甚至可以考虑构建一个符合 concurrent.futures
规范的 Future
,即使你是在手动排队和执行任务。其实这并不难,而且会给你一个非常不错的接口,特别是在调试时。
最后,值得注意的是,大多数围绕工作者和队列构建的代码可以通过执行器/池的设计变得简单很多,即使你非常确定每个队列只想要一个工作者。只需去掉所有的样板代码,把 Worker.run
方法中的循环变成一个函数(这个函数只需正常 return
或 raise
,而不是把结果添加到队列中)。在调用方那边,再次去掉所有样板代码,只需 submit
或 map
任务函数及其参数。
你的整个示例可以简化为:
def job(i):
# (code that does stuff)
1 / 0 # Dumb error
# (more code that does stuff)
return result
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
results = executor.map(job, range(10))
这样就能自动正确处理异常了。
正如你在评论中提到的,异常的追踪信息不会追溯到子进程;它只会追溯到手动的 raise result
调用(或者,如果你使用的是池或执行器,则追溯到池或执行器的内部)。
原因是 multiprocessing.Queue
是基于 pickle
构建的,而序列化异常时并不会序列化它们的追踪信息。原因在于你无法序列化追踪信息。而追踪信息之所以不能序列化,是因为它们充满了对本地执行上下文的引用,所以让它们在另一个进程中工作会非常困难。
那么……你能对此做些什么呢?不要去寻找一个完全通用的解决方案。相反,想想你实际需要什么。90%的情况下,你想要的是“记录异常及其追踪信息,并继续”或者“将异常及其追踪信息打印到 stderr
并像默认的未处理异常处理程序那样 exit(1)
”。对于这两种情况,你根本不需要传递异常;只需在子进程中格式化它,然后传递一个字符串。如果你确实需要更复杂的东西,明确你需要什么,并传递足够的信息来手动组合这些内容。如果你不知道如何格式化追踪信息和异常,可以查看 traceback
模块。它非常简单。这意味着你根本不需要接触序列化的机制。(并不是说 copyreg
一个序列化器或写一个带有 __reduce__
方法的持有类很难,但如果你不需要,为什么要学习这些呢?)