如何在调用task_done之前向JoinableQueue添加项目?

1 投票
1 回答
513 浏览
提问于 2025-04-18 02:21

我创建了一个可加入的队列(JoinableQueue)实例,并往里面添加了一些项目。接着,我又创建了一些消费者工作者来处理这些队列里的项目。

但是问题是,我需要在这些工作者里添加新的项目到队列中。不过,当我调用了q.get()之后,如果没有调用task_done(),Gevent就不允许我进行这个操作。如果我在调用task_done()之后再添加新项目到队列里,我就不能保证所有的项目都被处理完了。

这时会出现一个错误:gevent.hub.LoopExit: This operation would block forever

q = JoinableQueue()

def worker():
    item = q.get()
    newItems = consumeItem(item)
    [q.put(newItem) for newItem in newItems]
    q.task_done()

for item in initialItems:
    q.put(item)

for i in range(10):
    gevent.spawn(worker)

q.join() # I have to be sure all items are consumed when join stops blocking the program.

我该如何解决这个问题呢?

1 个回答

0

你的工作者只运行了一次。如果你的队列里的项目比你启动的工作者还多,它就永远不会完成。

解决办法很简单,只需要给工作者加一个循环:

def worker():
    for item in iter(q.get, None):
        newItems = consumeItem(item)
        [q.put(newItem) for newItem in newItems]
        q.task_done()

这样它们就会一直工作,直到队列里的项目处理完毕。

撰写回答