每分钟尝试一次Qpid

2024-05-16 03:38:25 发布

您现在位置:Python中文网/ 问答频道 /正文

我不熟悉Qpid Proton Python和AMQP。有一件事我有点被卡住了,我希望我能得到社区的一些支持

我的应用程序要求之一是,如果从我的应用程序到message broker(ActiveMQ)的连接丢失,则每分钟重新尝试一次连接

从源代码和这个:documentation(第5.2.4节,第14页)来看,似乎我可以在on_启动事件期间调用“container.connect()”方法时,为“reconnect”参数创建一个自定义退避实例

因此,我对自定义退避实例执行了类似的操作:

class Backoff:
    """
    A modified reconnect strategy that retries, every 60s. 
    Repeated calls to :meth:`next` returns a value for the next delay
    """

    def __init__(self):
        self.delay = 60

    def reset(self):
        """
        Reset the backoff delay to 60 seconds.
        (This method is required for Qpid Proton Library)
        """
        self.delay = 60

    def next(self):
        """
        Modified the backoff mechanism to attempt reconnect every 60s

        :return: The next delay in seconds.
        :rtype: ``float``
        """
        return self.delay

在on_启动期间:

def on_start(self, event):
    self.container = event.container
    self.conn = self.container.connect(
        url=self.url, user=self.user, password=self.password, reconnect=Backoff())

问题:

  1. 我可以知道如何测试这是否真的工作正常吗?我将print语句放在自定义退避实例的“next()”方法中,并断开Wifi以模拟断开连接,但是,重新连接的尝试似乎不起作用
  2. 在容器运行时,如何捕获断开连接事件并尝试重新连接

任何建议都将不胜感激,谢谢


Tags: theto实例self应用程序oncontainerdef