Rabbitmq - 在同一通道上消费和发布?
我有两个Python程序,它们通过pika连接到RabbitMQ。每个程序都消费一组主题,而另一个程序则作为回应发布这些主题。一个程序使用SelectConnection,另一个使用TornadoConnection。
目前,这两个程序只是测试程序,用来模拟用户和我的服务器之间的对话,每个程序的on_message()方法都是硬编码的,根据收到的routing_key来决定发布什么样的回应给对方。
最开始,在随机的时间后,通常不超过2分钟,我会遇到一个错误,像这样:
UnexpectedFrame: Basic.publish: (505) UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead
在Stack Overflow和其他地方查了很多帖子后,我了解到这个错误和“竞争条件”有关,也就是说,有东西在基本发布(basic_publish)完成之前就被消费掉了。
我对代码做了一些修改,不再立即进行基本发布,而是给connection.add_timeout()传递了一个回调,延迟1秒后再执行。做了这个修改后,我成功地让两个程序之间的“对话”持续了超过1小时,而没有再出现那个错误。
我的问题是,这只是一个临时解决办法,因为我只是在模拟一个用户吗?我是否需要为消费和发布分别使用两个不同的通道?
def on_message(self, unused_channel, basic_deliver, properties, body):
if self._sibling_app_id == properties.app_id:
self.dispatch_message(basic_deliver, properties, body)
def dispatch_message(self, basic_deliver, properties, body):
(user_id, msg_type) = basic_deliver.routing_key.rsplit('.', 1)
if "login-response" == msg_type:
print body
elif "gid-assignment" == msg_type:
print body
elif "tutor-logout" == msg_type:
print body
elif "tutor-turn" == msg_type:
message = "i don't know"
routing_key = "%s.input" % user_id
callback = self.delayed_publish_message(routing_key, message)
self.schedule_next_message(callback, 1)
elif "nlu" == msg_type:
message = "dnk"
routing_key = "%s.nlu-response" % user_id
callback = self.delayed_publish_message(routing_key, message)
self.schedule_next_message(callback, 1)
else:
print "invalid message-type: %s" % msg_type
print body
def delayed_publish_message(self, routing_key, message):
"""returns a callback which can be passed to schedule_next_message()"""
def delayed_publish_cb():
self.publish_message(routing_key, message)
return delayed_publish_cb
def schedule_next_message(self, cb, publish_interval=None):
if self._stopping:
return
if publish_interval is None:
publish_interval = self.PUBLISH_INTERVAL
if -1 == publish_interval:
return
self._connection.add_timeout(publish_interval, cb)
def publish_message(self, routing_key, message):
if self._stopping:
return
properties = pika.BasicProperties(app_id=self._app_id,
content_type='text/plain')
self._channel.basic_publish(self.EXCHANGE, routing_key,
message, properties)
2 个回答
我完成了我的提交,准备去睡觉时突然想明白了。原来,rabbitmq.com上的Python教程还是建议用以下方式安装pika:
sudo pip install pika==0.9.8
虽然0.9.8版本是在2012年发布的,但我觉得这个修复是在那个版本发布之后才加上的,0.9.9版本是在2013年发布的。
所以,我做了:
sudo pip uninstall pika
然后按照pika网站上的安装说明进行操作:
sudo pip install pika
接着我把所有的connection.add_timeout(1, delayed_publish_cb)都换成了basic_publish(),然后我祈祷一切顺利,运行了它,结果我的两个进程在不到5分钟的时间里互相交换了大约200,000条消息,完全没有问题。
真高兴知道2012年的那个bug修复现在依然有效。
我得告诉rabbitmq的团队,让他们更新一下他们的教程。
一个通道是单向使用的。AMQP协议规范对此有很清楚的说明:
一个AMQP会话将两个单向通道关联起来,从而形成两个容器之间的双向、顺序对话。一个连接可以同时有多个独立的会话在活动,数量上限取决于商定的通道数量。每个连接和会话都被视为每一方的端点,存储着关于该连接或会话的本地状态和最后已知的远程状态。
因此,你应该为你的应用程序使用一个输入通道和一个输出通道。