如何重新连接RabbitMQ?

2024-06-10 02:50:05 发布

您现在位置:Python中文网/ 问答频道 /正文

一旦RabbitMQ从另一个数据源接收到消息,我的python脚本就必须不断地向它发送消息。python脚本发送它们的频率可以变化,比如1分钟到30分钟。

以下是我如何建立与RabbitMQ的连接:

  rabt_conn = pika.BlockingConnection(pika.ConnectionParameters("some_host"))
  channel = rbt_conn.channel()

我只是有个例外

pika.exceptions.ConnectionClosed

我怎样才能重新连接到它?最好的方法是什么?有什么“策略”吗?是否可以发送ping以保持连接的活动性或设置超时?

任何指点都会很感激的。


Tags: 脚本host消息channelrabbitmqsomeconnpika
2条回答

RabbitMQ使用心跳来检测和关闭“死”连接,并防止网络设备(防火墙等)终止“空闲”连接。从版本3.5.5开始,默认超时设置为60秒(以前为~10分钟)。从docs

Heartbeat frames are sent about every timeout / 2 seconds. After two missed heartbeats, the peer is considered to be unreachable.

Pika的阻塞连接的问题是,在进行某些API调用(例如,channel.basic_publish()connection.sleep()等)之前,它无法响应心跳。

到目前为止我发现的方法是:

增加或取消超时

RabbitMQ在建立连接时与客户端协商超时。理论上,应该可以使用heartbeat_interval参数用更大的值覆盖服务器默认值,但是当前的Pika版本(0.10.0)在服务器和客户端提供的值之间使用min值。此问题已在当前的master上修复。

另一方面,可以通过将heartbeat_interval参数设置为0来完全停用heartbeat功能,这可能会使您陷入新问题(防火墙断开连接等)

重新连接

扩展@itsafire的答案,您可以编写自己的publisher类,以便在需要时重新连接。一个简单的实现示例:

import logging
import json
import pika

class Publisher:
    EXCHANGE='my_exchange'
    TYPE='topic'
    ROUTING_KEY = 'some_routing_key'

    def __init__(self, host, virtual_host, username, password):
        self._params = pika.connection.ConnectionParameters(
            host=host,
            virtual_host=virtual_host,
            credentials=pika.credentials.PlainCredentials(username, password))
        self._conn = None
        self._channel = None

    def connect(self):
        if not self._conn or self._conn.is_closed:
            self._conn = pika.BlockingConnection(self._params)
            self._channel = self._conn.channel()
            self._channel.exchange_declare(exchange=self.EXCHANGE,
                                           type=self.TYPE)

    def _publish(self, msg):
        self._channel.basic_publish(exchange=self.EXCHANGE,
                                    routing_key=self.ROUTING_KEY,
                                    body=json.dumps(msg).encode())
        logging.debug('message sent: %s', msg)

    def publish(self, msg):
        """Publish msg, reconnecting if necessary."""

        try:
            self._publish(msg)
        except pika.exceptions.ConnectionClosed:
            logging.debug('reconnecting to queue')
            self.connect()
            self._publish(msg)

    def close(self):
        if self._conn and self._conn.is_open:
            logging.debug('closing queue connection')
            self._conn.close()

其他可能性

我还没有探索的其他可能性:

非常简单:像这样的模式。

import time

while True:
    try:
        communication_handles = connect_pika()
        do_your_stuff(communication_handles)
    except pika.exceptions.ConnectionClosed:
        print 'oops. lost connection. trying to reconnect.'
        # avoid rapid reconnection on longer RMQ server outage
        time.sleep(0.5) 

您可能需要重新考虑您的代码,但基本上是为了捕获异常、缓解问题并继续执行您的工作。 communication_handles包含所有pika元素,如通道、队列以及您的东西需要通过pika与RabbitMQ通信的任何内容。

相关问题 更多 >