一次消费多个消息

10 投票
2 回答
6821 浏览
提问于 2025-04-18 07:56

我正在使用一个外部服务(我们称之为Service)来处理一些特定类型的对象。这个服务处理对象的速度更快,如果我一次发送10个对象。现在我的架构是这样的:一个生产者一个一个地发送对象,而一群消费者从队列中一个一个地取出这些对象,然后把它们发送给Service。这种方式显然不是最优的。

我不想修改生产者的代码,因为它可以在不同的情况下使用。我可以修改消费者的代码,但这样会增加额外的复杂性。我也知道有一个叫做prefetch_count的选项,但我觉得它只在网络层面上起作用——客户端库(pika)不允许在消费者回调中一次获取多个消息。

所以,RabbitMQ能否在发送消息给消费者之前将它们打包成批次?我在寻找一个类似于“每次消费n条消息”的选项。

2 个回答

0

下面的代码会使用 channel.consume 来开始接收消息。当接收到的消息数量达到我们想要的数量时,就会停止。

我设置了一个 batch_size,这样可以避免一次性拉取太多消息。你可以根据自己的需要随时调整 batch_size 的大小。

    def consume_messages(queue_name: str):
    msgs = list([])
    batch_size = 500

    q = channel.queue_declare(queue_name, durable=True, exclusive=False, auto_delete=False)
    q_length = q.method.message_count
    
    if not q_length:
        return msgs

    msgs_limit = batch_size if q_length > batch_size else q_length

    try:
        # Get messages and break out
        for method_frame, properties, body in channel.consume(queue_name):

            # Append the message
            try:
                msgs.append(json.loads(bytes.decode(body)))
            except:
                logger.info(f"Rabbit Consumer : Received message in wrong format {str(body)}")

            # Acknowledge the message
            channel.basic_ack(method_frame.delivery_tag)

            # Escape out of the loop when desired msgs are fetched
            if method_frame.delivery_tag == msgs_limit:

                # Cancel the consumer and return any pending messages
                requeued_messages = channel.cancel()
                print('Requeued %i messages' % requeued_messages)
                break

    except (ChannelWrongStateError, StreamLostError, AMQPConnectionError) as e:
        logger.info(f'Connection Interrupted: {str(e)}')

    finally:
        # Close the channel and the connection
        channel.stop_consuming()
        channel.close()

    return msgs
4

在消费者的回调中,你不能批量处理消息,但你可以使用线程安全的库,利用多个线程来获取数据。这样做的好处是,你可以在五个不同的线程上同时获取五条消息,如果需要的话,还可以把这些数据合并在一起。

举个例子,你可以看看我是如何使用我的AMQP库来实现这个的。https://github.com/eandersson/amqpstorm/blob/master/examples/scalable_consumer.py

撰写回答