如何使用Pika发送和接收RabbitMQ消息?
我在使用Pika的时候遇到了一些问题,特别是在处理路由键或交换机时,感觉和AMQP或RabbitMQ的文档不太一致。我知道RabbitMQ的文档用的是Pika的旧版本,所以我没有参考他们的示例代码。
我想做的是定义一个叫“order”的队列,并且有两个消费者,一个处理交换机或路由键“production”,另一个处理“test”。根据RabbitMQ的文档,这应该很简单,可以通过使用直接交换机和路由键,或者使用主题交换机来实现。
但是,Pika似乎不知道该如何处理这些交换机和路由键。通过RabbitMQ的管理工具查看队列时,很明显Pika要么没有正确地将消息放入队列,要么RabbitMQ直接把它丢掉了。
在消费者那边,我不太清楚应该如何将消费者绑定到交换机上,或者如何处理路由键,而文档也没有提供太多帮助。
如果我不考虑交换机和路由键,消息就能顺利排队,并且我的消费者也能轻松处理它们。
如果有人能提供一些指引或示例代码,那就太好了。
2 个回答
1
虽然西蒙的回答总体上看起来是对的,但你可能需要交换一下参数来进行消费。
channel.basic_consume(queue='test', on_message_callback=handle_delivery)
基本的设置大概是这样的:
credentials = pika.PlainCredentials("some_user", "some_password")
parameters = pika.ConnectionParameters(
"some_host.domain.tld", 5672, "some_vhost", credentials
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
要开始消费的话:
channel.start_consuming()
14
原来,我对AMQP的理解还不够全面。
大致的意思是这样的:
客户端:
客户端在建立连接后,只需要关注交换机的名字和路由键,其他的事情就不用管了。也就是说,我们并不知道消息最终会被放到哪个队列里。
channel.basic_publish(exchange='order',
routing_key="order.test.customer",
body=pickle.dumps(data),
properties=pika.BasicProperties(
content_type="text/plain",
delivery_mode=2))
消费者
当通道打开时,我们需要声明交换机和队列。
channel.exchange_declare(exchange='order',
type="topic",
durable=True,
auto_delete=False)
channel.queue_declare(queue="test",
durable=True,
exclusive=False,
auto_delete=False,
callback=on_queue_declared)
当队列准备好后,在“on_queue_declared”这个回调里,我们可以把队列和交换机绑定起来,使用我们想要的路由键。
channel.queue_bind(queue='test',
exchange='order',
routing_key='order.test.customer')
#handle_delivery is the callback that will actually pickup and handle messages
#from the "test" queue
channel.basic_consume(handle_delivery, queue='test')
发送到“order”交换机的消息,如果使用路由键“order.test.customer”,那么这些消息就会被路由到“test”队列,消费者就可以从那里取到这些消息。