如何在AMQP的Python客户端中监听basic.return
我想确认我的消息是否成功发送到了队列。
为此,我在基本发布时添加了一个必须的参数。还有什么其他方法可以让我收到 basic.return
消息,以便知道我的消息没有成功发送吗?
我不能使用 channel.wait()
来监听 basic.return
,因为当我的消息成功发送时,wait()
函数会一直卡在那里,根本不会结束。(没有超时设置)另一方面,如果我不调用 channel.wait()
,那么即使消息没有发送成功,channel.returned_messages
也会一直是空的。
我使用的是 py-amqplib
版本 0.6。
任何解决方案都欢迎。
3 个回答
1
你不能同步地这样做,因为这是一个异步系统。不过,你可以通过使用线程来解决这个问题。
基本的思路是,你启动一个线程,这个线程会在通道上等待。当它从等待中出来时,它会调用一个回调函数,处理返回消息队列中的任何消息。然后你可以在这个回调函数中按照你想要的方式处理这些消息。
def registerCallback(channel, call_back): """ This method sets up a thread which deals with the asynchronous callback for a message which could not be routed by the exchange. """ def wait(): try: channel.wait() except Exception, e: print("Problem waiting on publish channel: %s" % str(e)) while not channel.returned_messages.empty(): returnedMessage = channel.returned_messages.get() processReturnedMessageThread = Thread(target=call_back, args=(returnedMessage)) processReturnedMessageThread.start() wait() waiting = Thread(target=wait) waiting.start()
1
你试过唯一一个完整的Python AMQP库吗?它不太常用,因为打包得不够整齐。
第一步:编译C库 - 你可能需要运行 sudo apt-get install autotools-dev autoconf automake libtool
来安装一些工具。
mkdir rabbitc
cd rabbitc
hg clone http://hg.rabbitmq.com/rabbitmq-codegen/
hg clone http://hg.rabbitmq.com/rabbitmq-c/
cd rabbitmq-c
autoreconf -i
make clean
./configure --prefix=/usr
make
sudo make install
第二步:安装Python库。
pip install pylibrabbitmq
1
目前这是不可能的,因为当消息在代理(broker)中被丢弃时,basic.return
是异步发送的。也就是说,当消息成功发送后,服务器并不会报告任何数据。所以,pyAMQP 不能监听这些消息。
我看过一些关于这个问题的讨论。可能的解决方案有:
- 使用 txAMQP,这是一个处理
basic.return
的 amqp 的 twisted 版本。 - 使用 pyAMQP 并设置等待时间(timeout)。不过我不确定这现在是否可行。
- 频繁地用同步命令向服务器发送请求,这样 pyAMQP 就能在
basic.return
消息到达时接收到它们。
由于 pyAMQP 和 rabbitMQ 的支持水平都比较低,我们决定完全不使用 amqp 代理。