爱奥皮卡2消费者通过扇出接收的信息不在同一时间

2024-04-25 01:53:42 发布

您现在位置:Python中文网/ 问答频道 /正文

我对RabbitMQ和Pika还不熟悉,但我想我已经清楚地了解了它的工作原理

我需要实现这一点:

Producer创建消息并通过扇出交换发送,多个Producer(测试环境中的2个)接收相同的消息

但每次只有一个消费者收到消息

2019-11-29 19:02:44.167549 b'Hello'-第一消费者

2019-11-29 19:02:45.068192 b'Hello'-第二消费者

制作人:

    async def main(loop):
        connection = await connect_robust(
            "amqp://guest:guest@192.168.1.3/", loop=loop
        )

        queue_name = "test_queue"
        routing_key = "test_queue"

        # Creating channel
        channel = await connection.channel()

        # Declaring exchange
        exchange = await channel.declare_exchange('test_exchange',
                                                  ExchangeType.FANOUT, auto_delete=True
                                                  )

        # Declaring queue
        queue = await channel.declare_queue(
            queue_name, auto_delete=True
        )

        # Binding queue
        await queue.bind(exchange, routing_key)

        await exchange.publish(
            Message(
                bytes('Hello', 'utf-8'),
                content_type='text/plain',
                headers={'foo': 'bar'}
            ),
            routing_key
        )
    )


    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main(loop))

消费者:

    async def main(loop):
            connection = await aio_pika.connect_robust(host='192.168.1.3', login='guest', password='guest', loop=loop
                                                       )

            queue_name = "test_queue"

            async with connection:
                # Creating channel
                channel = await connection.channel()

                # Declaring queue
                queue = await channel.declare_queue(
                    queue_name, auto_delete=True
                )

                async with queue.iterator() as queue_iter:
                    async for message in queue_iter:
                        async with message.process():
                            print(datetime.datetime.now(), message.body)

                            if queue.name in message.body.decode():
                                break


    if __name__ == "__main__":
          loop = asyncio.get_event_loop()
          loop.run_until_complete(main(loop))
          loop.close()

Tags: nametestloop消息messagehelloasyncexchange
1条回答
网友
1楼 · 发布于 2024-04-25 01:53:42

首先,我假设您正在运行两个独立的使用者进程

每个使用者都应该将自己的队列绑定到扇出交换。不要使用共享队列。一种解决方案是让每个消费者使用独占队列

生产者不需要创建队列并将其绑定到fanout交换,只要您的消费者首先开始

先试试这个。然后,如果您需要考虑生产者可以首先启动,则必须创建两个具有已知名称的队列,并绑定它们。消费者开始时也应该这样做


注意:RabbitMQ团队监视rabbitmq-usersmailing list,有时只回答有关StackOverflow的问题。

相关问题 更多 >