在queue.Queue上进行多路复用?

12 投票
4 回答
4712 浏览
提问于 2025-04-17 08:06

我该如何同时在多个queue.Queue上进行“选择”操作呢?

在Golang中,有一个很不错的功能,它通过通道实现:

select {
case i1 = <-c1:
    print("received ", i1, " from c1\n")
case c2 <- i2:
    print("sent ", i2, " to c2\n")
case i3, ok := (<-c3):  // same as: i3, ok := <-c3
    if ok {
        print("received ", i3, " from c3\n")
    } else {
        print("c3 is closed\n")
    }
default:
    print("no communication\n")
}

在这里,第一个被解锁的通道会执行相应的代码块。我该如何在Python中实现这个功能呢?

更新0

根据这个链接,在tux21b的回答中提到,所需的队列类型具有以下特性:

  • 多生产者/多消费者队列(MPMC)
  • 为每个生产者提供先进先出/后进先出
  • 当队列为空或满时,消费者/生产者会被阻塞

此外,通道可以是阻塞的,生产者会一直阻塞,直到消费者取走项目。我不确定Python的Queue是否能做到这一点。

4 个回答

2

pychan这个项目在Python中实现了类似Go语言的通道功能,包括多路复用。它使用和Go语言相同的算法,所以能满足你想要的所有特性:

  • 多个生产者和消费者可以通过一个通道进行交流。当生产者和消费者都准备好时,他们会互相等待。
  • 生产者和消费者会按照到达的顺序进行处理(先进先出)。
  • 如果队列是空的,消费者会被阻塞;如果队列是满的,生产者会被阻塞。

下面是你的示例代码的样子:

c1 = Chan(); c2 = Chan(); c3 = Chan()

try:
    chan, value = chanselect([c1, c3], [(c2, i2)])
    if chan == c1:
        print("Received %r from c1" % value)
    elif chan == c2:
        print("Sent %r to c2" % i2)
    else:  # c3
        print("Received %r from c3" % value)
except ChanClosed as ex:
    if ex.which == c3:
        print("c3 is closed")
    else:
        raise

(坦白说:这个库是我写的)

3

如果你使用 queue.PriorityQueue,你可以通过将通道对象作为优先级来实现类似的行为:

import threading, logging
import random, string, time
from queue import PriorityQueue, Empty
from contextlib import contextmanager

logging.basicConfig(level=logging.NOTSET,
                    format="%(threadName)s - %(message)s")

class ChannelManager(object):
    next_priority = 0

    def __init__(self):
        self.queue = PriorityQueue()
        self.channels = []

    def put(self, channel, item, *args, **kwargs):
        self.queue.put((channel, item), *args, **kwargs)

    def get(self, *args, **kwargs):
        return self.queue.get(*args, **kwargs)

    @contextmanager
    def select(self, ordering=None, default=False):
        if default:
            try:
                channel, item = self.get(block=False)
            except Empty:
                channel = 'default'
                item = None
        else:
            channel, item = self.get()
        yield channel, item


    def new_channel(self, name):
        channel = Channel(name, self.next_priority, self)
        self.channels.append(channel)
        self.next_priority += 1
        return channel


class Channel(object):
    def __init__(self, name, priority, manager):
        self.name = name
        self.priority = priority
        self.manager = manager

    def __str__(self):
        return self.name

    def __lt__(self, other):
        return self.priority < other.priority

    def put(self, item):
        self.manager.put(self, item)


if __name__ == '__main__':
    num_channels = 3
    num_producers = 4
    num_items_per_producer = 2
    num_consumers = 3
    num_items_per_consumer = 3

    manager = ChannelManager()
    channels = [manager.new_channel('Channel#{0}'.format(i))
                for i in range(num_channels)]

    def producer_target():
        for i in range(num_items_per_producer):
            time.sleep(random.random())
            channel = random.choice(channels)
            message = random.choice(string.ascii_letters)
            logging.info('Putting {0} in {1}'.format(message, channel))
            channel.put(message)

    producers = [threading.Thread(target=producer_target,
                                  name='Producer#{0}'.format(i))
                 for i in range(num_producers)]
    for producer in producers:
        producer.start()
    for producer in producers:
        producer.join()
    logging.info('Producers finished')

    def consumer_target():
        for i in range(num_items_per_consumer):
            time.sleep(random.random())
            with manager.select(default=True) as (channel, item):
                if channel:
                    logging.info('Received {0} from {1}'.format(item, channel))
                else:
                    logging.info('No data received')

    consumers = [threading.Thread(target=consumer_target,
                                  name='Consumer#{0}'.format(i))
                 for i in range(num_consumers)]
    for consumer in consumers:
        consumer.start()
    for consumer in consumers:
        consumer.join()
    logging.info('Consumers finished')

示例输出:

Producer#0 - Putting x in Channel#2
Producer#2 - Putting l in Channel#0
Producer#2 - Putting A in Channel#2
Producer#3 - Putting c in Channel#0
Producer#3 - Putting z in Channel#1
Producer#1 - Putting I in Channel#1
Producer#1 - Putting L in Channel#1
Producer#0 - Putting g in Channel#1
MainThread - Producers finished
Consumer#1 - Received c from Channel#0
Consumer#2 - Received l from Channel#0
Consumer#0 - Received I from Channel#1
Consumer#0 - Received L from Channel#1
Consumer#2 - Received g from Channel#1
Consumer#1 - Received z from Channel#1
Consumer#0 - Received A from Channel#2
Consumer#1 - Received x from Channel#2
Consumer#2 - Received None from default
MainThread - Consumers finished

在这个例子中,ChannelManager 只是一个包装器,它围绕着 queue.PriorityQueue,并实现了 select 方法,作为一个 contextmanager,让它看起来像 Go 语言中的 select 语句。

这里有几点需要注意:

  • 顺序

    • 在 Go 的例子中,select 语句中通道的书写顺序决定了如果多个通道都有数据可用,哪个通道的代码会被执行。

    • 在 Python 的例子中,顺序是由分配给每个通道的优先级决定的。不过,优先级可以动态分配给每个通道(如示例所示),所以通过一个更复杂的 select 方法,可以根据方法的参数来改变顺序。同时,一旦上下文管理器结束,旧的顺序也可以恢复。

  • 阻塞

    • 在 Go 的例子中,如果存在 default 情况,select 语句会阻塞。

    • 在 Python 的例子中,必须向 select 方法传递一个布尔值参数,以明确何时需要阻塞或非阻塞。在非阻塞的情况下,上下文管理器返回的通道只是字符串 'default',这样在 with 语句内部的代码中就很容易检测到这一点。

  • 线程:queue 模块中的对象已经准备好用于多生产者、多消费者的场景,这在示例中已经看到了。

2

生产者-消费者队列有很多不同的实现方式,比如 queue.Queue。这些实现通常在很多特性上有所不同,具体可以参考这篇 很棒的文章,作者是Dmitry Vyukov。正如你所看到的,可能有超过一万种不同的组合。用于这些队列的算法也根据需求的不同而差异很大。想要在现有的队列算法上扩展以保证额外的特性是不可能的,因为这通常需要不同的内部数据结构和算法。

Go语言的通道提供了相对较多的保证特性,因此这些通道可能适合很多程序。其中一个比较难的要求是支持同时读取/阻塞多个通道(使用select语句),并且在多个分支都能继续的情况下公平地选择一个通道,以确保没有消息被遗漏。而Python的 queue.Queue 并不提供这些功能,因此用它无法实现相同的行为。

所以,如果你想继续使用 queue.Queue,你需要找到解决这个问题的变通方法。不过,这些变通方法也有自己的缺点,并且维护起来比较麻烦。寻找另一个提供你所需特性的生产者-消费者队列可能是个更好的主意!无论如何,这里有两个可能的变通方法:

轮询

while True:
  try:
    i1 = c1.get_nowait()
    print "received %s from c1" % i1
  except queue.Empty:
    pass
  try:
    i2 = c2.get_nowait()
    print "received %s from c2" % i2
  except queue.Empty:
    pass
  time.sleep(0.1)

这种方法在轮询通道时可能会消耗很多CPU资源,当消息很多时可能会很慢。使用time.sleep()并结合指数退避时间(而不是这里显示的固定0.1秒)可能会大大改善这个版本的性能。

单一通知队列

queue_id = notify.get()
if queue_id == 1:
  i1 = c1.get()
  print "received %s from c1" % i1
elif queue_id == 2:
  i2 = c2.get()
  print "received %s from c2" % i2

在这种设置下,你必须在向c1或c2发送消息后,向通知队列发送一些东西。如果你只需要一个这样的通知队列,这种方法可能对你有效(也就是说,你没有多个“选择”,每个都在不同的通道子集上阻塞)。

另外,你也可以考虑使用Go语言。Go的协程和并发支持比Python有限的线程能力要强大得多。

撰写回答