使用Pika客户端轮询RabbitMQ消息

2 投票
3 回答
3858 浏览
提问于 2025-04-17 17:27

我想在Python中创建一个RabbitMQ的接收器/消费者,但不太确定怎么检查消息。我想在自己的循环中实现这个功能,而不是使用pika中的回调。

如果我理解没错,在Java客户端中,我可以使用getBasic()来检查是否有可用的消息,而不会让程序停下来。我不介意在获取消息时让程序停一下,但我不想等到有消息才停下来。

我找不到清晰的例子,也还没弄明白在pika中对应的调用是什么。

3 个回答

1

队列处理循环可以通过使用 process_data_events() 来以迭代的方式完成:

import pika

# A stubborn callback that still wants to be in the code.
def mq_callback(ch, method, properties, body):
    print(" Received: %r" % body)

connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
queue_state = channel.queue_declare(queue="test")
# Configure a callback.
channel.basic_consume(mq_callback, queue="test")

try:
    # My own loop here:
    while(True):
        # Do other processing

        # Process message queue events, returning as soon as possible.
        # Issues mq_callback() when applicable.
        connection.process_data_events(time_limit=0)
finally:
    connection.close()
1

你可以定期检查队列的大小,参考这个回答的例子 在Pika中获取队列大小(AMQP Python)

1

如果你想要同步处理,那你需要看看Pika的 BlockingConnection

BlockingConnection是在Pika的异步核心上创建的一层,它提供了一些方法,这些方法会一直等待,直到你期待的响应返回。由于RabbitMQ到你应用程序的Basic.Deliver和Basic.Return调用是异步的,如果你想通过basic_consume接收RabbitMQ的消息,或者在使用basic_publish时想要知道消息发送失败的情况,你仍然需要实现一种叫做“继续传递风格”的异步方法。

更多信息和示例可以在这里找到

https://pika.readthedocs.org/en/0.9.12/connecting.html#blockingconnection

撰写回答