子进程完成但未终止,导致死锁

6 投票
1 回答
1751 浏览
提问于 2025-04-17 05:44

好的,既然目前没有人回答这个问题,我也不觉得有什么不好。虽然我还是想知道到底发生了什么导致这个问题,但我最急需解决的问题是更新2中提到的那些。也就是说,

JoinableQueueManager().Queue()之间有什么区别(什么时候应该用其中一个而不是另一个)?更重要的是,在这个例子中,能否安全地用一个替换另一个?


在下面的代码中,我有一个简单的进程池。每个进程都会接收到一个进程队列(pq),从中提取需要处理的数据,还有一个返回值队列(rq),用于将处理后的结果返回给主线程。如果我不往返回值队列里添加内容,它就能正常工作,但一旦我添加了内容,出于某种原因,进程就无法停止。在这两种情况下,进程的run方法都会返回,所以不是因为在返回队列上put阻塞了,而在第二种情况下,进程本身并没有终止,因此当我在进程上调用join时,程序就会死锁。这是为什么呢?

更新:

  1. 这似乎与队列中的项目数量有关。

    至少在我的机器上,我可以在队列中放入最多6570个项目,这样就能正常工作,但如果超过这个数量,就会死锁。

  2. 使用Manager().Queue()似乎可以正常工作。

    这可能是JoinableQueue的限制,或者只是我对这两个对象之间的区别理解有误。我发现如果我用Manager().Queue()替换返回队列,它就能按预期工作。它们之间有什么区别,什么时候应该用其中一个而不是另一个呢?

  3. 如果我在消费rq,就不会出现错误。

    哦,刚才这里有个答案,等我评论的时候,它就消失了。无论如何,它提到的其中一件事是,如果我添加一个消费者,这个错误是否仍然会发生。我试过了,答案是,不会。

    它提到的另一件事是引用了多进程文档中的一句话,可能是问题的关键。提到JoinableQueue时,它说:

    ... 用于计算未完成任务数量的信号量可能会最终溢出,从而引发异常。


import multiprocessing

class _ProcSTOP:
    pass

class Proc(multiprocessing.Process):

    def __init__(self, pq, rq):
        self._pq = pq
        self._rq = rq
        super().__init__()
        print('++', self.name)

    def run(self):
        dat = self._pq.get()

        while not dat is _ProcSTOP:
#            self._rq.put(dat)        # uncomment me for deadlock
            self._pq.task_done()
            dat = self._pq.get()

        self._pq.task_done() 
        print('==', self.name)

    def __del__(self):
        print('--', self.name)

if __name__ == '__main__':

    pq = multiprocessing.JoinableQueue()
    rq = multiprocessing.JoinableQueue()
    pool = []

    for i in range(4):
        p = Proc(pq, rq) 
        p.start()
        pool.append(p)

    for i in range(10000):
        pq.put(i)

    pq.join()

    for i in range(4):
       pq.put(_ProcSTOP)

    pq.join()

    while len(pool) > 0:
        print('??', pool)
        pool.pop().join()    # hangs here (if using rq)

    print('** complete')

不使用返回队列的示例输出:

++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-4
== Proc-3
== Proc-1
?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>]
== Proc-2
?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>, <Proc(Proc-3, stopped)>]
-- Proc-3
?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>]
-- Proc-2
?? [<Proc(Proc-1, stopped)>]
-- Proc-1
** complete
-- Proc-4

使用返回队列的示例输出:

++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-2
== Proc-4
== Proc-1
?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>]
== Proc-3
# here it hangs

1 个回答

0

来自文档的内容:

警告

如上所述,如果一个子进程在队列中放入了项目(而且它没有使用JoinableQueue.cancel_join_thread()),那么这个进程在所有缓冲的项目被处理完之前是不会结束的。

这意味着如果你尝试等待这个进程结束,可能会出现死锁,除非你确定所有放入队列的项目都已经被处理。同样,如果子进程不是守护进程,那么当父进程尝试等待所有非守护子进程结束时,可能会卡住。

需要注意的是,使用管理器创建的队列没有这个问题。请参见编程指南。

所以,JoinableQueue()使用的是管道,会等到所有数据处理完才能关闭。

另一方面,Manager.Queue()对象采用了完全不同的方法。管理器会运行一个单独的进程,立即接收所有数据(并将其存储在内存中)。

管理器提供了一种创建可以在不同进程之间共享的数据的方法。管理器对象控制一个服务器进程,该进程管理共享对象。其他进程可以通过使用代理来访问这些共享对象。

...

Queue([maxsize]) 创建一个共享的Queue.Queue对象,并返回它的代理。

撰写回答