我有两个单独的RabbitMQ实例。我正试图找到最好的方式来听这两个事件。
例如,我可以在一个上使用以下事件:
credentials = pika.PlainCredentials(user, pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(Exclusive=True)
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*')
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
self.channel.start_consuming()
我还有第二个主持人“host2”,我也想听听。我想创建两个独立的线程来完成这项工作,但从我所读到的内容来看,pika并不是线程安全的。有更好的办法吗?或者创建两个单独的线程,每个线程监听一个不同的Rabbit实例(host1和host2)就足够了吗?
下面是我如何使用一个rabbitmq实例同时侦听两个队列的示例:
“什么是最佳方法”的答案在很大程度上取决于队列的使用模式和您所说的“最佳”是什么意思。既然我还不能对问题发表评论,我就试着提出一些可能的解决办法。
在每个例子中,我假设exchange已经声明了。
螺纹
您可以使用^{} 在单个进程中使用来自独立主机上两个队列的消息。
您是对的,因为its own FAQ states,} 模块在线程中运行此示例,如下所示:
pika
不是线程安全的,但是可以通过为每个线程创建到RabbitMQ主机的连接以多线程方式使用它。使用^{我已经声明了
callback_func
作为打印消息体时纯粹使用ConsumerThread.name
的方法。它也可能是ConsumerThread
类之外的函数。过程
或者,您可以始终只运行一个进程,每个要使用事件的队列使用使用者代码。
然后运行:
如果您对来自队列的消息所做的工作是CPU-heavy,并且只要您的CPU中的内核数=使用者的数量,那么通常最好使用这种方法-除非您的队列大部分时间是空的,并且使用者不会使用此CPU时间*。
异步
另一种选择是使用一些异步框架(例如^{} )并在单个线程中运行整个过程。
您不能再在异步代码中使用
BlockingConnection
;幸运的是,pika
有用于Twisted
的适配器:这种方法会更好,从中消耗的队列越多,用户执行的工作受CPU限制越小*。
Python3
既然您提到了
pika
,我就把自己局限于基于Python 2.x的解决方案,因为pika
还没有移植。但是,如果您希望移到>;=3.3,一个可能的选择是将^{} 与AMQP协议(您与RabbitMQ交谈的协议)之一一起使用,例如^{} 或^{} 。
*-请注意,这些都是非常浅显的提示-在大多数情况下,选择并不是那么明显;什么对您最好取决于队列“饱和”(消息/时间)、接收这些消息后您做了什么工作、在什么环境中运行您的消费者等;除了对所有实现进行基准测试之外,没有其他方法可以确定
相关问题 更多 >
编程相关推荐