Gevent线程未完成即使队列项已耗尽

6 投票
2 回答
4787 浏览
提问于 2025-04-17 12:35

我正在尝试在Gevent中设置一个简单的生产者-消费者系统,但我的脚本无法正常退出:

import gevent
from gevent.queue import *
import time
import random

q = Queue()
workers = []

def do_work(wid, value):
    """
    Actual blocking function
    """
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid
    return


def worker(wid):
    """
    Consumer
    """
    while True:
        item = q.get()
        do_work(wid, item)


def producer():
    """
    Producer
    """
    for i in range(4):
        workers.append(gevent.spawn(worker, random.randint(1, 100000)))


    for item in range(1, 9):
         q.put(item)

producer()
gevent.joinall(workers)

我没有找到好的示例或教程来使用Gevent,所以我上面贴的代码是我从网上拼凑而来的。

多个工作线程被激活,物品被放入队列中,但即使队列里的所有东西都处理完了,主程序还是不退出。我必须按CTRL ^ C来强制结束。

我哪里做错了呢?

谢谢。

顺便说一下,如果我的脚本有什么可以改进的地方,请告诉我。比如检查队列是否为空之类的简单事情。

2 个回答

2

在你的工作程序中,你启动了一个会一直运行的循环。

顺便说一下,有一种更优雅的“无限循环”写法,可以简单地用:

for work_unit in q:
    # Do work, etc

gevent.joinall() 会等待所有工作程序完成;但因为它们永远不会完成,所以你的程序会一直在等待。这就是为什么它无法退出的原因。

如果你不再关心这些工作程序了,你可以直接把它们杀掉:

gevent.killall(workers)

另一种方法是在队列中放一个“特殊”的项目。当一个工作程序收到这个项目时,它会意识到这和正常工作不一样,然后停止工作。

for worker in workers:
    q.put("TimeToDie")

for work_unit in q:
    if work_unint == "TimeToDie":
        break
    do_work()

或者你也可以使用 gevent 的 Event 来实现这种模式。

5

我觉得你应该使用JoinableQueue,就像文档中的例子那样。

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []

def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid

def worker(wid):
    while True:
        item = q.get()
        try:
            do_work(wid, item)
        finally:
            q.task_done()


def producer():
    for i in range(4):
        workers.append(gevent.spawn(worker, random.randint(1, 100000)))

    for item in range(1, 9):
         q.put(item)

producer()
q.join()

撰写回答