Python多进程进程无声崩溃

16 投票
4 回答
16958 浏览
提问于 2025-04-17 19:49

我正在使用 Python 2.7.3。我通过子类化 multiprocessing.Process 对象来实现代码的并行处理。如果我在子类化的 Process 对象中没有错误,所有的代码都能正常运行。但是如果在这些对象的代码中出现错误,它们会悄无声息地崩溃(父进程的控制台不会显示任何错误信息),而且 CPU 使用率会降到零。父进程本身不会崩溃,这让人觉得程序似乎在卡住。与此同时,找出代码中的错误非常困难,因为没有任何提示告诉我错误出在哪里。

我在 StackOverflow 上找不到其他人问过类似的问题。

我猜子类化的 Process 对象之所以悄悄崩溃,是因为它们无法将错误信息打印到父进程的控制台,但我想知道我可以做些什么,以便至少能更有效地调试(这样其他使用我代码的人也能在遇到问题时告诉我)。

编辑:我的实际代码太复杂了,但一个简单的子类化 Process 对象的例子,里面有错误,可能是这样的:

from multiprocessing import Process, Queue

class Worker(Process):

    def __init__(self, inputQueue, outputQueue):

        super(Worker, self).__init__()

        self.inputQueue = inputQueue
        self.outputQueue = outputQueue

    def run(self):

        for i in iter(self.inputQueue.get, 'STOP'):

            # (code that does stuff)

            1 / 0 # Dumb error

            # (more code that does stuff)

            self.outputQueue.put(result)

4 个回答

1

这不是一个答案,只是一个扩展的评论。请运行这个程序,然后告诉我们你得到了什么输出(如果有的话):

from multiprocessing import Process, Queue

class Worker(Process):

    def __init__(self, inputQueue, outputQueue):

        super(Worker, self).__init__()

        self.inputQueue = inputQueue
        self.outputQueue = outputQueue

    def run(self):

        for i in iter(self.inputQueue.get, 'STOP'):

            # (code that does stuff)

            1 / 0 # Dumb error

            # (more code that does stuff)

            self.outputQueue.put(result)

if __name__ == '__main__':
    inq, outq = Queue(), Queue()
    inq.put(1)
    inq.put('STOP')
    w = Worker(inq, outq)
    w.start()

我得到的是:

% test.py
Process Worker-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/home/unutbu/pybin/test.py", line 21, in run
    1 / 0 # Dumb error
ZeroDivisionError: integer division or modulo by zero

如果你什么都没得到,我会很惊讶。

3

我建议一种方法来显示程序运行中的异常情况。

from multiprocessing import Process
import traceback


run_old = Process.run

def run_new(*args, **kwargs):
    try:
        run_old(*args, **kwargs)
    except (KeyboardInterrupt, SystemExit):
        raise
    except:
        traceback.print_exc(file=sys.stdout)

Process.run = run_new
19

你真正想要的是一种方法,把异常信息传递给父进程,对吧?这样你就可以按照自己的方式处理它们。

如果你使用 concurrent.futures.ProcessPoolExecutor,这个过程是自动的。如果你使用 multiprocessing.Pool,这也很简单。如果你使用显式的 ProcessQueue,那就需要做一些额外的工作,但其实也没那么复杂。

举个例子:

def run(self):
    try:
        for i in iter(self.inputQueue.get, 'STOP'):
            # (code that does stuff)
            1 / 0 # Dumb error
            # (more code that does stuff)
            self.outputQueue.put(result)
    except Exception as e:
        self.outputQueue.put(e)

然后,你的调用代码可以像处理其他任何东西一样,从队列中读取 Exception。而不是这样:

yield outq.pop()

而是这样:

result = outq.pop()
if isinstance(result, Exception):
    raise result
yield result

(我不知道你实际的父进程读取队列的代码是怎么做的,因为你给的最小示例只是忽略了队列。但希望这能解释这个想法,尽管你的真实代码实际上并不是这样工作的。)

这里假设你想在任何未处理的异常发生时终止程序。如果你想把异常传回去并继续处理下一个 i in iter,只需把 try 移到 for 循环里面,而不是把它包裹在外面。

这也假设 Exception 不是有效的值。如果这是个问题,最简单的解决办法就是推送 (result, exception) 的元组:

def run(self):
    try:
        for i in iter(self.inputQueue.get, 'STOP'):
            # (code that does stuff)
            1 / 0 # Dumb error
            # (more code that does stuff)
            self.outputQueue.put((result, None))
    except Exception as e:
        self.outputQueue.put((None, e))

然后,你的弹出代码可以这样做:

result, exception = outq.pop()
if exception:
    raise exception
yield result

你可能会注意到,这和 node.js 的回调风格很相似,你需要把 (err, result) 传递给每个回调。是的,这很烦人,而且你会搞乱这种风格的代码。但实际上你并没有在其他地方使用这种方式,除了在包装器里;你所有的“应用级”代码在从队列获取值或在 run 内部调用时,看到的都是正常的返回值或抛出的异常。

你甚至可以考虑构建一个符合 concurrent.futures 规范的 Future,即使你是在手动排队和执行任务。其实这并不难,而且会给你一个非常不错的接口,特别是在调试时。

最后,值得注意的是,大多数围绕工作者和队列构建的代码可以通过执行器/池的设计变得简单很多,即使你非常确定每个队列只想要一个工作者。只需去掉所有的样板代码,把 Worker.run 方法中的循环变成一个函数(这个函数只需正常 returnraise,而不是把结果添加到队列中)。在调用方那边,再次去掉所有样板代码,只需 submitmap 任务函数及其参数。

你的整个示例可以简化为:

def job(i):
    # (code that does stuff)
    1 / 0 # Dumb error
    # (more code that does stuff)
    return result

with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
    results = executor.map(job, range(10))

这样就能自动正确处理异常了。


正如你在评论中提到的,异常的追踪信息不会追溯到子进程;它只会追溯到手动的 raise result 调用(或者,如果你使用的是池或执行器,则追溯到池或执行器的内部)。

原因是 multiprocessing.Queue 是基于 pickle 构建的,而序列化异常时并不会序列化它们的追踪信息。原因在于你无法序列化追踪信息。而追踪信息之所以不能序列化,是因为它们充满了对本地执行上下文的引用,所以让它们在另一个进程中工作会非常困难。

那么……你能对此做些什么呢?不要去寻找一个完全通用的解决方案。相反,想想你实际需要什么。90%的情况下,你想要的是“记录异常及其追踪信息,并继续”或者“将异常及其追踪信息打印到 stderr 并像默认的未处理异常处理程序那样 exit(1)”。对于这两种情况,你根本不需要传递异常;只需在子进程中格式化它,然后传递一个字符串。如果你确实需要更复杂的东西,明确你需要什么,并传递足够的信息来手动组合这些内容。如果你不知道如何格式化追踪信息和异常,可以查看 traceback 模块。它非常简单。这意味着你根本不需要接触序列化的机制。(并不是说 copyreg 一个序列化器或写一个带有 __reduce__ 方法的持有类很难,但如果你不需要,为什么要学习这些呢?)

撰写回答