Python多进程队列的可靠性,Queue与SimpleQueue与JoinableQueue的对比

12 投票
1 回答
8001 浏览
提问于 2025-04-17 15:07

直接引用Python的文档

类 multiprocessing.Queue([maxsize])

...

qsize() 返回队列的大致大小。由于多线程/多进程的特性,这个数字并不可靠。

empty() 如果队列为空,返回True;否则返回False。由于多线程/多进程的特性,这个结果也不可靠。

我在实际使用中发现,这些说法对于Queue来说非常准确,尤其是empty()这个方法。

在我的代码里,有很多进程(每个都是同一个主进程的子进程),它们在各自的run方法中都有以下内容:

while self.active:
    if(self.exclusive_queue.empty() and self.exclusive_queue.qsize() == 0):
        try:
            self.exclusive_queue.put(self.general_queue.get(timeout=self.queue_timeout))
        except Queue.Empty as empty_queue:
            continue
    else:
        task = self.exclusive_queue.get()
        self.compute(task)

基本上,这个进程会在general_queue上等待工作,但首先会检查它的exclusive_queue。主进程可以把任务放到任意一个队列中。现在,在if(self.exclusive_queue.empty() and self.exclusive_queue.qsize() == 0)这行代码中,我最开始使用了self.exclusive_queue.empty(),结果出现了奇怪的情况(qsize()显示30多个,而empty() = True)。

我想说的是,文档中提到的multiprocessing.queues.SimpleQueue

empty() 如果队列为空,返回True;否则返回False。

但没有提到它的可靠性。那么,SimpleQueue.empty() 这个方法可靠吗?

还有,multiprocessing.JoinableQueue是否比Queue更可靠,或者说“更”可靠,因为它有task_done()这个机制?

这样的做法算不算正确,或者通过在子进程之间共享管道端点的回调方式会更合适呢?

1 个回答

7

这不是一个直接的回答,但我发现我越来越依赖于用一个保护条件来循环处理输入队列。在多进程模块的文档中有一个例子:

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

所以当你向队列输入的内容完成后,你只需要向队列里放入和你启动的进程数量一样多的 STOP 字符串,或者你选择的其他保护标识。

撰写回答