如何在Gevent中实现多生产者多消费者模式?

5 投票
4 回答
2300 浏览
提问于 2025-04-17 12:40

我有一些生产者函数,它们依赖于需要大量输入输出的阻塞调用,还有一些消费者函数也依赖于这些阻塞调用。为了加快它们的速度,我使用了Gevent这个微线程库来连接它们。

我的整体结构大概是这样的:

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []
producers = []

def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid

def worker(wid):
    while True:
        item = q.get()
        try:
            print "Got item %s" % item
            do_work(wid, item)
        finally:
            print "No more items"
            q.task_done()


def producer():
    while True:
        item = random.randint(1, 11)
        if item == 10:
            print "Signal Received"
            return
        else:
            print "Added item %s" % item
            q.put(item)



for i in range(4):
    workers.append(gevent.spawn(worker, random.randint(1, 100000)))

#This doesnt work.
for j in range(2):
    producers.append(gevent.spawn(producer))

#Uncommenting this makes this script work.
#producer()

q.join()

我有四个消费者,想要有两个生产者。当生产者接收到一个信号,比如10时,它们就会退出。消费者会不断从这个队列中获取数据,整个任务在生产者和消费者都完成后结束。

但是,这个方法不奏效。如果我把那个生成多个生产者的for循环注释掉,只用一个生产者,脚本就能正常运行。

我似乎搞不清楚我哪里出错了。

有什么想法吗?

谢谢

4 个回答

0

你想要做的是在生产者和工作者之间进行交流时,暂停主程序的运行。也就是说,程序会在队列上等待,直到队列里的东西都处理完了,然后再继续运行,这个过程可能会很快就完成。把这个放在你程序的最后,而不是用q.join()

gevent.joinall(producers)
3

我觉得在往队列里放东西之前,它会先执行 q.join(),然后就立刻退出了。你可以试着在加入队列之前,先让所有的生产者都完成。

6

其实,当队列里没有未完成的工作时,你并不想直接退出,因为从概念上讲,这并不是应用程序应该结束的时刻。

你应该在生产者完成工作后,再检查一下是否还有未完成的工作,这时候再决定是否退出。

# Wait for all producers to finish producing
gevent.joinall(producers)
# *Now* we want to make sure there's no unfinished work
q.join()
# We don't care about workers. We weren't paying them anything, anyways
gevent.killall(workers)
# And, we're done.

撰写回答