Python多进程队列的可靠性,Queue与SimpleQueue与JoinableQueue的对比
直接引用Python的文档:
类 multiprocessing.Queue([maxsize])
...
qsize() 返回队列的大致大小。由于多线程/多进程的特性,这个数字并不可靠。
empty() 如果队列为空,返回True;否则返回False。由于多线程/多进程的特性,这个结果也不可靠。
我在实际使用中发现,这些说法对于Queue
来说非常准确,尤其是empty()
这个方法。
在我的代码里,有很多进程(每个都是同一个主进程的子进程),它们在各自的run
方法中都有以下内容:
while self.active:
if(self.exclusive_queue.empty() and self.exclusive_queue.qsize() == 0):
try:
self.exclusive_queue.put(self.general_queue.get(timeout=self.queue_timeout))
except Queue.Empty as empty_queue:
continue
else:
task = self.exclusive_queue.get()
self.compute(task)
基本上,这个进程会在general_queue
上等待工作,但首先会检查它的exclusive_queue
。主进程可以把任务放到任意一个队列中。现在,在if(self.exclusive_queue.empty() and self.exclusive_queue.qsize() == 0)
这行代码中,我最开始使用了self.exclusive_queue.empty()
,结果出现了奇怪的情况(qsize()
显示30多个,而empty() = True
)。
我想说的是,文档中提到的multiprocessing.queues.SimpleQueue
:
empty() 如果队列为空,返回True;否则返回False。
但没有提到它的可靠性。那么,SimpleQueue.empty() 这个方法可靠吗?
还有,multiprocessing.JoinableQueue
是否比Queue
更可靠,或者说“更”可靠,因为它有task_done()
这个机制?
这样的做法算不算正确,或者通过在子进程之间共享管道端点的回调方式会更合适呢?
1 个回答
这不是一个直接的回答,但我发现我越来越依赖于用一个保护条件来循环处理输入队列。在多进程模块的文档中有一个例子:
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
所以当你向队列输入的内容完成后,你只需要向队列里放入和你启动的进程数量一样多的 STOP
字符串,或者你选择的其他保护标识。