有没有方法定义一个RabbitMQ消费者,使其在不阻塞程序的情况下持续监听队列?

2 投票
1 回答
42 浏览
提问于 2025-04-14 17:12

我正在做一个项目,使用Python和RabbitMq。最开始,管理员会验证一个订单,然后把它发布到RabbitMq的消息队列“verified_orders”中。我想要一个消费者,它可以持续监听这个消息队列,消费一个订单,并把详细信息发送回客户端,但又不想让整个程序都卡住。

我看过一些RabbitMQ的教程,里面有这样的代码:假设已经建立了与RabbitMQ的连接。

发布消息的代码:

channel.basic_publish(exchange='',
                      routing_key='verified_orders',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")

消费消息的代码:

def callback(ch, method, properties, body):
    print(f" [x] Received {body}")

channel.basic_consume(queue='verified_orders',
                      auto_ack=True,
                      on_message_callback=callback)

channel.start_consuming()

当执行start_consuming()时,它会无限期地等待数据并运行回调函数,但这会让程序进入一个阻塞状态,而我正想避免这种情况。

我找到的另一种方法是定义一个带有无限循环的异步函数来消费消息,但我觉得使用无限循环并不是一个好的方法。

channel.basic_get(queue='verified_orders', auto_ack=True)

1 个回答

0

如果你想写一个不阻塞的程序,就必须使用并发。实现并发有两种方法,一种是用 多线程,另一种是 异步编程

异步方法更优,因为你可以在一个线程里通过无限循环来完成你的目标。而且,当你处理的是输入输出(IO)相关的任务时,比如连接、消费或发布消息到消息队列(MQ),就应该使用 async

根据我对你代码的理解,你在使用 pika 这个库。这个库是同步的。你可以用 aiopika 来代替,它提供了所有 Rabbitmq 的异步功能。

撰写回答