在Python/pika中处理多个队列

45 投票
3 回答
22703 浏览
提问于 2025-04-18 11:43

我正在尝试创建一个消费者,它可以订阅多个队列,然后处理到达的消息。

问题是,当第一个队列里已经有数据时,它会先处理第一个队列的消息,然后就不再去处理第二个队列了。不过,当第一个队列空了之后,它就会去处理下一个队列,这样就能同时处理两个队列的消息。

我最开始是用线程来实现这个功能,但我想避免使用线程,因为pika库可以帮我处理这些事情,而且操作起来不复杂。下面是我的代码:

import pika

mq_connection = pika.BlockingConnection(pika.ConnectionParameters('x.x.x.x'))
mq_channel = mq_connection.channel()
mq_channel.basic_qos(prefetch_count=1)


def callback(ch, method, properties, body):
    print body
    mq_channel.basic_ack(delivery_tag=method.delivery_tag)

mq_channel.basic_consume(callback, queue='queue1', consumer_tag="ctag1.0")
mq_channel.basic_consume(callback, queue='queue2', consumer_tag="ctag2.0")
mq_channel.start_consuming()

3 个回答

2

和上面第一个答案中的评论类似,我使用 pika 1.1.0 也得到了类似的结果,代码如下:

import pika

def queue1_callback(ch, method, properties, body):
  print(" [x] Received queue 1: %r" % body)

def queue2_callback(ch, method, properties, body):
  print(" [x] Received queue 2: %r" % body)

def on_open(connection):
  connection.channel(on_open_callback = on_channel_open)


def on_channel_open(channel):
  channel.basic_consume('queue1', queue1_callback, auto_ack = True)
  channel.basic_consume('queue2', queue2_callback, auto_ack = True)

credentials = pika.PlainCredentials('u', 'p')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.SelectConnection(parameters = parameters, on_open_callback = on_open)

Try:
  connection.ioloop.start()
except KeyboardInterrupt:
  connection.close()
  connection.ioloop.start()
2

这个问题很可能是因为第一次调用已经开始接收消息了,而这个消息是从一个已经填充好的队列里来的。在第二次调用之前,第一次调用就已经收到了消息。你可以试着把QoS的预取数量设置为1,这样RabbitMQ就不会一次性给你发送超过一条消息。

28

一种可能的解决办法是使用非阻塞连接来接收消息。

import pika


def callback(channel, method, properties, body):
    print(body)
    channel.basic_ack(delivery_tag=method.delivery_tag)


def on_open(connection):
    connection.channel(on_open_callback=on_channel_open)


def on_channel_open(channel):
    channel.basic_consume(queue='queue1', on_message_callback=callback)
    channel.basic_consume(queue='queue2', on_message_callback=callback)


parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
connection = pika.SelectConnection(parameters=parameters,
                                   on_open_callback=on_open)

try:
    connection.ioloop.start()
except KeyboardInterrupt:
    connection.close()

这样可以连接到多个队列,并根据需要接收消息。

撰写回答