子进程完成但未终止,导致死锁
好的,既然目前没有人回答这个问题,我也不觉得有什么不好。虽然我还是想知道到底发生了什么导致这个问题,但我最急需解决的问题是更新2中提到的那些。也就是说,
JoinableQueue
和Manager().Queue()
之间有什么区别(什么时候应该用其中一个而不是另一个)?更重要的是,在这个例子中,能否安全地用一个替换另一个?
在下面的代码中,我有一个简单的进程池。每个进程都会接收到一个进程队列(pq
),从中提取需要处理的数据,还有一个返回值队列(rq
),用于将处理后的结果返回给主线程。如果我不往返回值队列里添加内容,它就能正常工作,但一旦我添加了内容,出于某种原因,进程就无法停止。在这两种情况下,进程的run
方法都会返回,所以不是因为在返回队列上put
阻塞了,而在第二种情况下,进程本身并没有终止,因此当我在进程上调用join
时,程序就会死锁。这是为什么呢?
更新:
这似乎与队列中的项目数量有关。
至少在我的机器上,我可以在队列中放入最多6570个项目,这样就能正常工作,但如果超过这个数量,就会死锁。使用
Manager().Queue()
似乎可以正常工作。
这可能是JoinableQueue
的限制,或者只是我对这两个对象之间的区别理解有误。我发现如果我用Manager().Queue()
替换返回队列,它就能按预期工作。它们之间有什么区别,什么时候应该用其中一个而不是另一个呢?如果我在消费
rq
,就不会出现错误。
哦,刚才这里有个答案,等我评论的时候,它就消失了。无论如何,它提到的其中一件事是,如果我添加一个消费者,这个错误是否仍然会发生。我试过了,答案是,不会。它提到的另一件事是引用了多进程文档中的一句话,可能是问题的关键。提到
JoinableQueue
时,它说:... 用于计算未完成任务数量的信号量可能会最终溢出,从而引发异常。
import multiprocessing
class _ProcSTOP:
pass
class Proc(multiprocessing.Process):
def __init__(self, pq, rq):
self._pq = pq
self._rq = rq
super().__init__()
print('++', self.name)
def run(self):
dat = self._pq.get()
while not dat is _ProcSTOP:
# self._rq.put(dat) # uncomment me for deadlock
self._pq.task_done()
dat = self._pq.get()
self._pq.task_done()
print('==', self.name)
def __del__(self):
print('--', self.name)
if __name__ == '__main__':
pq = multiprocessing.JoinableQueue()
rq = multiprocessing.JoinableQueue()
pool = []
for i in range(4):
p = Proc(pq, rq)
p.start()
pool.append(p)
for i in range(10000):
pq.put(i)
pq.join()
for i in range(4):
pq.put(_ProcSTOP)
pq.join()
while len(pool) > 0:
print('??', pool)
pool.pop().join() # hangs here (if using rq)
print('** complete')
不使用返回队列的示例输出:
++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-4
== Proc-3
== Proc-1
?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>]
== Proc-2
?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>, <Proc(Proc-3, stopped)>]
-- Proc-3
?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>]
-- Proc-2
?? [<Proc(Proc-1, stopped)>]
-- Proc-1
** complete
-- Proc-4
使用返回队列的示例输出:
++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-2
== Proc-4
== Proc-1
?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>]
== Proc-3
# here it hangs
1 个回答
来自文档的内容:
警告
如上所述,如果一个子进程在队列中放入了项目(而且它没有使用JoinableQueue.cancel_join_thread()),那么这个进程在所有缓冲的项目被处理完之前是不会结束的。
这意味着如果你尝试等待这个进程结束,可能会出现死锁,除非你确定所有放入队列的项目都已经被处理。同样,如果子进程不是守护进程,那么当父进程尝试等待所有非守护子进程结束时,可能会卡住。
需要注意的是,使用管理器创建的队列没有这个问题。请参见编程指南。
所以,JoinableQueue()使用的是管道,会等到所有数据处理完才能关闭。
另一方面,Manager.Queue()对象采用了完全不同的方法。管理器会运行一个单独的进程,立即接收所有数据(并将其存储在内存中)。
管理器提供了一种创建可以在不同进程之间共享的数据的方法。管理器对象控制一个服务器进程,该进程管理共享对象。其他进程可以通过使用代理来访问这些共享对象。
...
Queue([maxsize]) 创建一个共享的Queue.Queue对象,并返回它的代理。