如何在调用task_done之前向JoinableQueue添加项目?
我创建了一个可加入的队列(JoinableQueue)实例,并往里面添加了一些项目。接着,我又创建了一些消费者工作者来处理这些队列里的项目。
但是问题是,我需要在这些工作者里添加新的项目到队列中。不过,当我调用了q.get()之后,如果没有调用task_done(),Gevent就不允许我进行这个操作。如果我在调用task_done()之后再添加新项目到队列里,我就不能保证所有的项目都被处理完了。
这时会出现一个错误:gevent.hub.LoopExit: This operation would block forever
q = JoinableQueue()
def worker():
item = q.get()
newItems = consumeItem(item)
[q.put(newItem) for newItem in newItems]
q.task_done()
for item in initialItems:
q.put(item)
for i in range(10):
gevent.spawn(worker)
q.join() # I have to be sure all items are consumed when join stops blocking the program.
我该如何解决这个问题呢?
1 个回答
0
你的工作者只运行了一次。如果你的队列里的项目比你启动的工作者还多,它就永远不会完成。
解决办法很简单,只需要给工作者加一个循环:
def worker():
for item in iter(q.get, None):
newItems = consumeItem(item)
[q.put(newItem) for newItem in newItems]
q.task_done()
这样它们就会一直工作,直到队列里的项目处理完毕。