一次消费多个消息
我正在使用一个外部服务(我们称之为Service)来处理一些特定类型的对象。这个服务处理对象的速度更快,如果我一次发送10个对象。现在我的架构是这样的:一个生产者一个一个地发送对象,而一群消费者从队列中一个一个地取出这些对象,然后把它们发送给Service。这种方式显然不是最优的。
我不想修改生产者的代码,因为它可以在不同的情况下使用。我可以修改消费者的代码,但这样会增加额外的复杂性。我也知道有一个叫做prefetch_count
的选项,但我觉得它只在网络层面上起作用——客户端库(pika)不允许在消费者回调中一次获取多个消息。
所以,RabbitMQ能否在发送消息给消费者之前将它们打包成批次?我在寻找一个类似于“每次消费n条消息”的选项。
2 个回答
0
下面的代码会使用 channel.consume
来开始接收消息。当接收到的消息数量达到我们想要的数量时,就会停止。
我设置了一个 batch_size
,这样可以避免一次性拉取太多消息。你可以根据自己的需要随时调整 batch_size
的大小。
def consume_messages(queue_name: str):
msgs = list([])
batch_size = 500
q = channel.queue_declare(queue_name, durable=True, exclusive=False, auto_delete=False)
q_length = q.method.message_count
if not q_length:
return msgs
msgs_limit = batch_size if q_length > batch_size else q_length
try:
# Get messages and break out
for method_frame, properties, body in channel.consume(queue_name):
# Append the message
try:
msgs.append(json.loads(bytes.decode(body)))
except:
logger.info(f"Rabbit Consumer : Received message in wrong format {str(body)}")
# Acknowledge the message
channel.basic_ack(method_frame.delivery_tag)
# Escape out of the loop when desired msgs are fetched
if method_frame.delivery_tag == msgs_limit:
# Cancel the consumer and return any pending messages
requeued_messages = channel.cancel()
print('Requeued %i messages' % requeued_messages)
break
except (ChannelWrongStateError, StreamLostError, AMQPConnectionError) as e:
logger.info(f'Connection Interrupted: {str(e)}')
finally:
# Close the channel and the connection
channel.stop_consuming()
channel.close()
return msgs
4
在消费者的回调中,你不能批量处理消息,但你可以使用线程安全的库,利用多个线程来获取数据。这样做的好处是,你可以在五个不同的线程上同时获取五条消息,如果需要的话,还可以把这些数据合并在一起。
举个例子,你可以看看我是如何使用我的AMQP库来实现这个的。https://github.com/eandersson/amqpstorm/blob/master/examples/scalable_consumer.py