Rabbitmq - 在同一通道上消费和发布?

6 投票
2 回答
6583 浏览
提问于 2025-04-18 15:34

我有两个Python程序,它们通过pika连接到RabbitMQ。每个程序都消费一组主题,而另一个程序则作为回应发布这些主题。一个程序使用SelectConnection,另一个使用TornadoConnection。

目前,这两个程序只是测试程序,用来模拟用户和我的服务器之间的对话,每个程序的on_message()方法都是硬编码的,根据收到的routing_key来决定发布什么样的回应给对方。

最开始,在随机的时间后,通常不超过2分钟,我会遇到一个错误,像这样:

UnexpectedFrame: Basic.publish: (505) UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead

在Stack Overflow和其他地方查了很多帖子后,我了解到这个错误和“竞争条件”有关,也就是说,有东西在基本发布(basic_publish)完成之前就被消费掉了。

我对代码做了一些修改,不再立即进行基本发布,而是给connection.add_timeout()传递了一个回调,延迟1秒后再执行。做了这个修改后,我成功地让两个程序之间的“对话”持续了超过1小时,而没有再出现那个错误。

我的问题是,这只是一个临时解决办法,因为我只是在模拟一个用户吗?我是否需要为消费和发布分别使用两个不同的通道?

def on_message(self, unused_channel, basic_deliver, properties, body):
    if self._sibling_app_id == properties.app_id:
        self.dispatch_message(basic_deliver, properties, body)


def dispatch_message(self, basic_deliver, properties, body):
    (user_id, msg_type) = basic_deliver.routing_key.rsplit('.', 1)

    if "login-response" == msg_type:
        print body
    elif "gid-assignment" == msg_type:
        print body
    elif "tutor-logout" == msg_type:
        print body
    elif "tutor-turn" == msg_type:
        message = "i don't know"
        routing_key = "%s.input" % user_id
        callback = self.delayed_publish_message(routing_key, message)
        self.schedule_next_message(callback, 1)
    elif "nlu" == msg_type:
        message = "dnk"
        routing_key = "%s.nlu-response" % user_id
        callback = self.delayed_publish_message(routing_key, message)
        self.schedule_next_message(callback, 1)
    else:
        print "invalid message-type: %s" % msg_type
        print body

def delayed_publish_message(self, routing_key, message):
    """returns a callback which can be passed to schedule_next_message()"""
    def delayed_publish_cb():
        self.publish_message(routing_key, message)
    return delayed_publish_cb


def schedule_next_message(self, cb, publish_interval=None):
    if self._stopping:
        return
    if publish_interval is None:
        publish_interval = self.PUBLISH_INTERVAL
    if -1 == publish_interval:
        return
    self._connection.add_timeout(publish_interval, cb)


def publish_message(self, routing_key, message):
    if self._stopping:
        return
    properties = pika.BasicProperties(app_id=self._app_id,
                                                          content_type='text/plain')
    self._channel.basic_publish(self.EXCHANGE, routing_key,
                                                 message, properties)

2 个回答

1

我完成了我的提交,准备去睡觉时突然想明白了。原来,rabbitmq.com上的Python教程还是建议用以下方式安装pika:

 sudo pip install pika==0.9.8

虽然0.9.8版本是在2012年发布的,但我觉得这个修复是在那个版本发布之后才加上的,0.9.9版本是在2013年发布的。

所以,我做了:

sudo pip uninstall pika

然后按照pika网站上的安装说明进行操作:

sudo pip install pika

接着我把所有的connection.add_timeout(1, delayed_publish_cb)都换成了basic_publish(),然后我祈祷一切顺利,运行了它,结果我的两个进程在不到5分钟的时间里互相交换了大约200,000条消息,完全没有问题。

真高兴知道2012年的那个bug修复现在依然有效。

我得告诉rabbitmq的团队,让他们更新一下他们的教程。

9

一个通道是单向使用的。AMQP协议规范对此有很清楚的说明:

一个AMQP会话将两个单向通道关联起来,从而形成两个容器之间的双向、顺序对话。一个连接可以同时有多个独立的会话在活动,数量上限取决于商定的通道数量。每个连接和会话都被视为每一方的端点,存储着关于该连接或会话的本地状态和最后已知的远程状态。

因此,你应该为你的应用程序使用一个输入通道和一个输出通道。

撰写回答