Gevent线程未完成即使队列项已耗尽
我正在尝试在Gevent中设置一个简单的生产者-消费者系统,但我的脚本无法正常退出:
import gevent
from gevent.queue import *
import time
import random
q = Queue()
workers = []
def do_work(wid, value):
"""
Actual blocking function
"""
gevent.sleep(random.randint(0,2))
print 'Task', value, 'done', wid
return
def worker(wid):
"""
Consumer
"""
while True:
item = q.get()
do_work(wid, item)
def producer():
"""
Producer
"""
for i in range(4):
workers.append(gevent.spawn(worker, random.randint(1, 100000)))
for item in range(1, 9):
q.put(item)
producer()
gevent.joinall(workers)
我没有找到好的示例或教程来使用Gevent,所以我上面贴的代码是我从网上拼凑而来的。
多个工作线程被激活,物品被放入队列中,但即使队列里的所有东西都处理完了,主程序还是不退出。我必须按CTRL ^ C
来强制结束。
我哪里做错了呢?
谢谢。
顺便说一下,如果我的脚本有什么可以改进的地方,请告诉我。比如检查队列是否为空之类的简单事情。
2 个回答
2
在你的工作程序中,你启动了一个会一直运行的循环。
顺便说一下,有一种更优雅的“无限循环”写法,可以简单地用:
for work_unit in q:
# Do work, etc
gevent.joinall() 会等待所有工作程序完成;但因为它们永远不会完成,所以你的程序会一直在等待。这就是为什么它无法退出的原因。
如果你不再关心这些工作程序了,你可以直接把它们杀掉:
gevent.killall(workers)
另一种方法是在队列中放一个“特殊”的项目。当一个工作程序收到这个项目时,它会意识到这和正常工作不一样,然后停止工作。
for worker in workers:
q.put("TimeToDie")
for work_unit in q:
if work_unint == "TimeToDie":
break
do_work()
或者你也可以使用 gevent 的 Event 来实现这种模式。
5
我觉得你应该使用JoinableQueue
,就像文档中的例子那样。
import gevent
from gevent.queue import *
import time
import random
q = JoinableQueue()
workers = []
def do_work(wid, value):
gevent.sleep(random.randint(0,2))
print 'Task', value, 'done', wid
def worker(wid):
while True:
item = q.get()
try:
do_work(wid, item)
finally:
q.task_done()
def producer():
for i in range(4):
workers.append(gevent.spawn(worker, random.randint(1, 100000)))
for item in range(1, 9):
q.put(item)
producer()
q.join()