使用Pika客户端轮询RabbitMQ消息
我想在Python中创建一个RabbitMQ的接收器/消费者,但不太确定怎么检查消息。我想在自己的循环中实现这个功能,而不是使用pika中的回调。
如果我理解没错,在Java客户端中,我可以使用getBasic()
来检查是否有可用的消息,而不会让程序停下来。我不介意在获取消息时让程序停一下,但我不想等到有消息才停下来。
我找不到清晰的例子,也还没弄明白在pika中对应的调用是什么。
3 个回答
1
队列处理循环可以通过使用 process_data_events()
来以迭代的方式完成:
import pika
# A stubborn callback that still wants to be in the code.
def mq_callback(ch, method, properties, body):
print(" Received: %r" % body)
connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
channel = connection.channel()
queue_state = channel.queue_declare(queue="test")
# Configure a callback.
channel.basic_consume(mq_callback, queue="test")
try:
# My own loop here:
while(True):
# Do other processing
# Process message queue events, returning as soon as possible.
# Issues mq_callback() when applicable.
connection.process_data_events(time_limit=0)
finally:
connection.close()
1
你可以定期检查队列的大小,参考这个回答的例子 在Pika中获取队列大小(AMQP Python)
1
如果你想要同步处理,那你需要看看Pika的 BlockingConnection
。
BlockingConnection是在Pika的异步核心上创建的一层,它提供了一些方法,这些方法会一直等待,直到你期待的响应返回。由于RabbitMQ到你应用程序的Basic.Deliver和Basic.Return调用是异步的,如果你想通过basic_consume接收RabbitMQ的消息,或者在使用basic_publish时想要知道消息发送失败的情况,你仍然需要实现一种叫做“继续传递风格”的异步方法。
更多信息和示例可以在这里找到
https://pika.readthedocs.org/en/0.9.12/connecting.html#blockingconnection