在Python/pika中处理多个队列
我正在尝试创建一个消费者,它可以订阅多个队列,然后处理到达的消息。
问题是,当第一个队列里已经有数据时,它会先处理第一个队列的消息,然后就不再去处理第二个队列了。不过,当第一个队列空了之后,它就会去处理下一个队列,这样就能同时处理两个队列的消息。
我最开始是用线程来实现这个功能,但我想避免使用线程,因为pika库可以帮我处理这些事情,而且操作起来不复杂。下面是我的代码:
import pika
mq_connection = pika.BlockingConnection(pika.ConnectionParameters('x.x.x.x'))
mq_channel = mq_connection.channel()
mq_channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
print body
mq_channel.basic_ack(delivery_tag=method.delivery_tag)
mq_channel.basic_consume(callback, queue='queue1', consumer_tag="ctag1.0")
mq_channel.basic_consume(callback, queue='queue2', consumer_tag="ctag2.0")
mq_channel.start_consuming()
3 个回答
2
和上面第一个答案中的评论类似,我使用 pika 1.1.0 也得到了类似的结果,代码如下:
import pika
def queue1_callback(ch, method, properties, body):
print(" [x] Received queue 1: %r" % body)
def queue2_callback(ch, method, properties, body):
print(" [x] Received queue 2: %r" % body)
def on_open(connection):
connection.channel(on_open_callback = on_channel_open)
def on_channel_open(channel):
channel.basic_consume('queue1', queue1_callback, auto_ack = True)
channel.basic_consume('queue2', queue2_callback, auto_ack = True)
credentials = pika.PlainCredentials('u', 'p')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.SelectConnection(parameters = parameters, on_open_callback = on_open)
Try:
connection.ioloop.start()
except KeyboardInterrupt:
connection.close()
connection.ioloop.start()
2
这个问题很可能是因为第一次调用已经开始接收消息了,而这个消息是从一个已经填充好的队列里来的。在第二次调用之前,第一次调用就已经收到了消息。你可以试着把QoS的预取数量设置为1,这样RabbitMQ就不会一次性给你发送超过一条消息。
28
一种可能的解决办法是使用非阻塞连接来接收消息。
import pika
def callback(channel, method, properties, body):
print(body)
channel.basic_ack(delivery_tag=method.delivery_tag)
def on_open(connection):
connection.channel(on_open_callback=on_channel_open)
def on_channel_open(channel):
channel.basic_consume(queue='queue1', on_message_callback=callback)
channel.basic_consume(queue='queue2', on_message_callback=callback)
parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
connection = pika.SelectConnection(parameters=parameters,
on_open_callback=on_open)
try:
connection.ioloop.start()
except KeyboardInterrupt:
connection.close()
这样可以连接到多个队列,并根据需要接收消息。