如何在AMQP的Python客户端中监听basic.return

2 投票
3 回答
1410 浏览
提问于 2025-04-15 18:50

我想确认我的消息是否成功发送到了队列。

为此,我在基本发布时添加了一个必须的参数。还有什么其他方法可以让我收到 basic.return 消息,以便知道我的消息没有成功发送吗?

我不能使用 channel.wait() 来监听 basic.return,因为当我的消息成功发送时,wait() 函数会一直卡在那里,根本不会结束。(没有超时设置)另一方面,如果我不调用 channel.wait(),那么即使消息没有发送成功,channel.returned_messages 也会一直是空的。

我使用的是 py-amqplib 版本 0.6。

任何解决方案都欢迎。

3 个回答

1

你不能同步地这样做,因为这是一个异步系统。不过,你可以通过使用线程来解决这个问题。

基本的思路是,你启动一个线程,这个线程会在通道上等待。当它从等待中出来时,它会调用一个回调函数,处理返回消息队列中的任何消息。然后你可以在这个回调函数中按照你想要的方式处理这些消息。

def registerCallback(channel, call_back):
    """ This method sets up a thread which deals with the asynchronous callback for a message which could not be routed by the exchange.
    """
    def wait():
        try:
            channel.wait()
        except Exception, e:
            print("Problem waiting on publish channel: %s" % str(e))

        while not channel.returned_messages.empty():
            returnedMessage = channel.returned_messages.get()
            processReturnedMessageThread = Thread(target=call_back, args=(returnedMessage))
            processReturnedMessageThread.start()

        wait()

    waiting = Thread(target=wait) 
    waiting.start()
1

你试过唯一一个完整的Python AMQP库吗?它不太常用,因为打包得不够整齐。

第一步:编译C库 - 你可能需要运行 sudo apt-get install autotools-dev autoconf automake libtool 来安装一些工具。

mkdir rabbitc
cd rabbitc
hg clone http://hg.rabbitmq.com/rabbitmq-codegen/
hg clone http://hg.rabbitmq.com/rabbitmq-c/
cd rabbitmq-c
autoreconf -i
make clean
./configure --prefix=/usr
make
sudo make install

第二步:安装Python库。

pip install pylibrabbitmq
1

目前这是不可能的,因为当消息在代理(broker)中被丢弃时,basic.return 是异步发送的。也就是说,当消息成功发送后,服务器并不会报告任何数据。所以,pyAMQP 不能监听这些消息。

我看过一些关于这个问题的讨论。可能的解决方案有:

  • 使用 txAMQP,这是一个处理 basic.return 的 amqp 的 twisted 版本。
  • 使用 pyAMQP 并设置等待时间(timeout)。不过我不确定这现在是否可行。
  • 频繁地用同步命令向服务器发送请求,这样 pyAMQP 就能在 basic.return 消息到达时接收到它们。

由于 pyAMQP 和 rabbitMQ 的支持水平都比较低,我们决定完全不使用 amqp 代理。

撰写回答