Python Kombu - 阻塞
我正在使用kombu来管理RabbitMQ,采用生产者/消费者的模式。我启动了我的生产者,它在队列中放了100个任务(我只有一个队列和一个交换机)。我想同时启动多个消费者,让每个消费者一次处理一个任务。可惜的是,消费者之间互相阻塞(也就是说,当一个消费者从队列中取出一个任务时,其他消费者就只能闲着)。如果我结束正在工作的消费者,那么其他消费者中的一个就会开始工作。有没有办法让所有消费者同时运行,每个消费者处理队列中的不同任务呢?我的消费者代码如下:
def start_consumer(self, incoming_exchange_name):
if self.rabbitmq_connection.connected:
callbacks=[]
queues=[]
callbacks.append(self._callback)
queues.append(self.incoming_queue)
print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
self.incoming_exchange(settings.rabbitmq_connection).declare()
self.incoming_queue(settings.rabbitmq_connection).declare()
with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks) as consumer:
while True:
try:
self.rabbitmq_connection.drain_events()
except Exception as e:
print 'Error -> %s' % e.message
2 个回答
-1
我觉得你其实是在尝试自己重写Celery:
除非你只是为了学习,不然就别折腾自己了,直接用Celery吧。顺便提一下,kombu
和RabbitMQ
正是Celery用来做后台的(还有Redis这个后台也可以用,这让我在一些应用中省了不少时间)。
5
你需要把消费者的预取设置为1(https://kombu.readthedocs.org/en/latest/reference/kombu.transport.pyamqp.html#kombu.transport.pyamqp.Connection.Channel.basic_qos),这样每个消费者就只会拿到1条消息,其他的消息会留在队列里,状态是准备好的。比如说,如果你有2个消费者,预取设置为1,而你有100条消息,那么你就会同时处理2个任务。
我已经在你的代码中添加了缺失的部分,以设置预取数量。
def start_consumer(self, incoming_exchange_name):
if self.rabbitmq_connection.connected:
callbacks=[]
queues=[]
callbacks.append(self._callback)
queues.append(self.incoming_queue)
print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
self.incoming_exchange(settings.rabbitmq_connection).declare()
self.incoming_queue(settings.rabbitmq_connection).declare()
channel = self.rabbitmq_connection.channel()
channel.basic_qos(prefetch_size=0, prefetch_count=1, a_global=False)
with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks, channel=channel) as consumer:
while True:
try:
self.rabbitmq_connection.drain_events()
except Exception as e:
print 'Error -> %s' % e.message