如何使用Pika发送和接收RabbitMQ消息?

6 投票
2 回答
11461 浏览
提问于 2025-04-16 14:31

我在使用Pika的时候遇到了一些问题,特别是在处理路由键或交换机时,感觉和AMQP或RabbitMQ的文档不太一致。我知道RabbitMQ的文档用的是Pika的旧版本,所以我没有参考他们的示例代码。

我想做的是定义一个叫“order”的队列,并且有两个消费者,一个处理交换机或路由键“production”,另一个处理“test”。根据RabbitMQ的文档,这应该很简单,可以通过使用直接交换机和路由键,或者使用主题交换机来实现。

但是,Pika似乎不知道该如何处理这些交换机和路由键。通过RabbitMQ的管理工具查看队列时,很明显Pika要么没有正确地将消息放入队列,要么RabbitMQ直接把它丢掉了。

在消费者那边,我不太清楚应该如何将消费者绑定到交换机上,或者如何处理路由键,而文档也没有提供太多帮助。

如果我不考虑交换机和路由键,消息就能顺利排队,并且我的消费者也能轻松处理它们。

如果有人能提供一些指引或示例代码,那就太好了。

2 个回答

1

虽然西蒙的回答总体上看起来是对的,但你可能需要交换一下参数来进行消费。

channel.basic_consume(queue='test', on_message_callback=handle_delivery) 

基本的设置大概是这样的:

credentials = pika.PlainCredentials("some_user", "some_password")
parameters = pika.ConnectionParameters(
    "some_host.domain.tld", 5672, "some_vhost", credentials
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

要开始消费的话:

channel.start_consuming()
14

原来,我对AMQP的理解还不够全面。

大致的意思是这样的:

客户端

客户端在建立连接后,只需要关注交换机的名字和路由键,其他的事情就不用管了。也就是说,我们并不知道消息最终会被放到哪个队列里。

channel.basic_publish(exchange='order',
                      routing_key="order.test.customer",
                      body=pickle.dumps(data),
                      properties=pika.BasicProperties(
                          content_type="text/plain",
                          delivery_mode=2))

消费者

当通道打开时,我们需要声明交换机和队列。

channel.exchange_declare(exchange='order', 
                         type="topic", 
                         durable=True, 
                         auto_delete=False)

channel.queue_declare(queue="test", 
                      durable=True, 
                      exclusive=False, 
                      auto_delete=False, 
                      callback=on_queue_declared)

当队列准备好后,在“on_queue_declared”这个回调里,我们可以把队列和交换机绑定起来,使用我们想要的路由键。

channel.queue_bind(queue='test', 
                   exchange='order', 
                   routing_key='order.test.customer')

#handle_delivery is the callback that will actually pickup and handle messages
#from the "test" queue
channel.basic_consume(handle_delivery, queue='test') 

发送到“order”交换机的消息,如果使用路由键“order.test.customer”,那么这些消息就会被路由到“test”队列,消费者就可以从那里取到这些消息。

撰写回答