PAHO MQTT Python客户机确认丢失,保证subscrib的交付

2024-05-16 14:01:08 发布

您现在位置:Python中文网/ 问答频道 /正文

各位开发者

我正在查看Paho MQTT客户机(针对Python),我认为我理解发布服务器的QoS设置(例如传感器或任何数据源)确实有意义-您希望能够确保消息已被(代理/服务器)接收到。在

我还认为,从订阅者的角度来看,请求QoS“2”是有意义的,以确保MQTT服务器确实向订阅者发送每个消息,但我正在努力解决这个问题:似乎没有一种方法可以让订阅者发出成功(或不)处理接收到的消息的信号,换句话说,似乎缺少某种明确的确认方式?在

用例-成功处理消息

我想订阅一个主题并成功地处理每个数据点,例如通过存储到数据库。因此,我需要适应订阅者在“飞行中”失败的情况,即在收到来自代理的消息之后,但在成功处理(存储到数据库)之前。在

假设

现在,如果订阅服务器(固定客户机_id)在处理数据时发生故障,它将重新启动,然后重新连接到MQTT代理,代理通过id标识这个特定的客户机,并再次开始推送消息—从订阅者断开连接后的下一条消息开始—就代理而言,它确实成功地传递了最后一条消息(不知道订户崩溃了)。在

潜在解决方案

如果上述假设是真的,那么我最好不要使用固定的客户机_id,而是使用“clean_session”和一个随机的客户机_id;这样,代理就开始传递特定主题的所有消息。这当然会让订阅者负责跟踪成功处理的消息。在

是这样做的吗?或者有没有一种方法可以显式地确认订阅者成功地处理了一条消息,以便代理可以重新传输应该是-我对Paho Python库特别感兴趣。在

提前谢谢!在

编辑1:

相关代码:

def _handle_on_message(self, message):
    matched = False
    with self._callback_mutex:
        try:
            topic = message.topic
        except UnicodeDecodeError:
            topic = None

        if topic is not None:
            for callback in self._on_message_filtered.iter_match(message.topic):
                with self._in_callback:
                    callback(self, self._userdata, message)
                matched = True

        if matched == False and self.on_message:
            with self._in_callback:
                self.on_message(self, self._userdata, message)

来源:https://github.com/eclipse/paho.mqtt.python/blob/v1.3.1/src/paho/mqtt/client.py#L2631

编辑2:

@hardlib确实是正确的-当回调函数内部发生故障时,客户端代码不会向代理确认

一些说明代码

on_message订阅服务器中的回调

^{pr2}$

出版商

while True:
    count += 1
    logging.debug("At: " + str(count))
    msg = "message: {counter}".format(counter=count)
    mqttc.publish("paho/stacko", msg, qos=2, retain=False)

日志

出版商

root@14f00c2576b2:/usr/src/app# python publisher.py
DEBUG:root:Sending CONNECT (u0, p0, wr0, wq0, wf0, c1, k60) client_id=b''
DEBUG:root:At: 1
DEBUG:root:Sending PUBLISH (d0, q2, r0, m1), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received CONNACK (0, 0)
DEBUG:root:Received PUBREC (Mid: 1)
DEBUG:root:Sending PUBREL (Mid: 1)
DEBUG:root:Received PUBCOMP (Mid: 1)
DEBUG:root:At: 2
DEBUG:root:Sending PUBLISH (d0, q2, r0, m2), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 2)
DEBUG:root:Sending PUBREL (Mid: 2)
DEBUG:root:Received PUBCOMP (Mid: 2)
DEBUG:root:At: 3
DEBUG:root:Sending PUBLISH (d0, q2, r0, m3), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 3)
DEBUG:root:Sending PUBREL (Mid: 3)
DEBUG:root:Received PUBCOMP (Mid: 3)
DEBUG:root:At: 4
DEBUG:root:Sending PUBLISH (d0, q2, r0, m4), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 4)
DEBUG:root:Sending PUBREL (Mid: 4)
DEBUG:root:Received PUBCOMP (Mid: 4)
DEBUG:root:At: 5
DEBUG:root:Sending PUBLISH (d0, q2, r0, m5), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 5)
DEBUG:root:Sending PUBREL (Mid: 5)
DEBUG:root:Received PUBCOMP (Mid: 5)
DEBUG:root:At: 6
DEBUG:root:Sending PUBLISH (d0, q2, r0, m6), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 6)
DEBUG:root:Sending PUBREL (Mid: 6)
DEBUG:root:Received PUBCOMP (Mid: 6)
DEBUG:root:At: 7
DEBUG:root:Sending PUBLISH (d0, q2, r0, m7), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 7)
DEBUG:root:Sending PUBREL (Mid: 7)
DEBUG:root:Received PUBCOMP (Mid: 7)
DEBUG:root:At: 8
DEBUG:root:Sending PUBLISH (d0, q2, r0, m8), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 8)
DEBUG:root:Sending PUBREL (Mid: 8)
DEBUG:root:Received PUBCOMP (Mid: 8)
DEBUG:root:At: 9
DEBUG:root:Sending PUBLISH (d0, q2, r0, m9), 'b'paho/stacko'', ... (10 bytes)
DEBUG:root:Received PUBREC (Mid: 9)
DEBUG:root:Sending PUBREL (Mid: 9)
DEBUG:root:Received PUBCOMP (Mid: 9)
DEBUG:root:At: 10
DEBUG:root:Sending PUBLISH (d0, q2, r0, m10), 'b'paho/stacko'', ... (11 bytes)
DEBUG:root:Received PUBREC (Mid: 10)
DEBUG:root:Sending PUBREL (Mid: 10)
DEBUG:root:Received PUBCOMP (Mid: 10)
DEBUG:root:At: 11
DEBUG:root:Sending PUBLISH (d0, q2, r0, m11), 'b'paho/stacko'', ... (11 bytes)
DEBUG:root:Received PUBREC (Mid: 11)
DEBUG:root:Sending PUBREL (Mid: 11)
DEBUG:root:Received PUBCOMP (Mid: 11)

用户

root@ca7dcaaed68f:/usr/src/app# python subscriber.py
DEBUG:root:Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'client_02'
DEBUG:root:Received CONNACK (0, 0)
DEBUG:root:Connected
Connected with result code 0
DEBUG:root:Sending SUBSCRIBE (d0) [(b'#', 2)]
DEBUG:root:Received SUBACK
DEBUG:root:Received PUBLISH (d0, q2, r0, m1), 'paho/stacko', ...  (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 1)
DEBUG:root:Received PUBREL (Mid: 1)
DEBUG:root:Message!!!! b'message: 2'
paho/stacko b'message: 2' mid:1
Num: 2
DEBUG:root:Sending PUBCOMP (Mid: 1)
DEBUG:root:Received PUBLISH (d0, q2, r0, m2), 'paho/stacko', ...  (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 2)
DEBUG:root:Received PUBREL (Mid: 2)
DEBUG:root:Message!!!! b'message: 3'
paho/stacko b'message: 3' mid:2
Num: 3
Going to show myself out now (sys.exit(1))

root@ca7dcaaed68f:/usr/src/app# python subscriber.py
DEBUG:root:Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'client_02'
DEBUG:root:Received CONNACK (1, 0)
DEBUG:root:Connected
Connected with result code 0
DEBUG:root:Sending SUBSCRIBE (d0) [(b'#', 2)]
DEBUG:root:Received PUBREL (Mid: 2)
DEBUG:root:Received PUBLISH (d0, q2, r0, m3), 'paho/stacko', ...  (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 3)
DEBUG:root:Received PUBLISH (d0, q2, r0, m4), 'paho/stacko', ...  (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 4)
DEBUG:root:Received SUBACK
DEBUG:root:Received PUBREL (Mid: 3)
DEBUG:root:Message!!!! b'message: 4'
paho/stacko b'message: 4' mid:3
Num: 4
DEBUG:root:Sending PUBCOMP (Mid: 3)
DEBUG:root:Received PUBREL (Mid: 4)
DEBUG:root:Message!!!! b'message: 5'
paho/stacko b'message: 5' mid:4
Num: 5
DEBUG:root:Sending PUBCOMP (Mid: 4)
DEBUG:root:Received PUBLISH (d0, q2, r0, m5), 'paho/stacko', ...  (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 5)
DEBUG:root:Received PUBREL (Mid: 5)
DEBUG:root:Message!!!! b'message: 6'
paho/stacko b'message: 6' mid:5
Num: 6
Going to show myself out now (sys.exit(1))

root@ca7dcaaed68f:/usr/src/app# python subscriber.py
DEBUG:root:Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'client_02'
DEBUG:root:Received CONNACK (1, 0)
DEBUG:root:Connected
Connected with result code 0
DEBUG:root:Sending SUBSCRIBE (d0) [(b'#', 2)]
DEBUG:root:Received PUBREL (Mid: 2)
DEBUG:root:Received PUBREL (Mid: 5)
DEBUG:root:Received SUBACK
DEBUG:root:Received PUBLISH (d0, q2, r0, m6), 'paho/stacko', ...  (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 6)
DEBUG:root:Received PUBREL (Mid: 6)
DEBUG:root:Message!!!! b'message: 7'
paho/stacko b'message: 7' mid:6
Num: 7
DEBUG:root:Sending PUBCOMP (Mid: 6)
DEBUG:root:Received PUBREL (Mid: 2)
DEBUG:root:Received PUBLISH (d0, q2, r0, m7), 'paho/stacko', ...  (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 7)
DEBUG:root:Received PUBREL (Mid: 7)
DEBUG:root:Message!!!! b'message: 8'
paho/stacko b'message: 8' mid:7
Num: 8
DEBUG:root:Sending PUBCOMP (Mid: 7)
DEBUG:root:Received PUBLISH (d0, q2, r0, m8), 'paho/stacko', ...  (10 bytes)
DEBUG:root:Sending PUBREC (Mid: 8)
DEBUG:root:Received PUBREL (Mid: 8)
DEBUG:root:Message!!!! b'message: 9'
paho/stacko b'message: 9' mid:8
Num: 9
Going to show myself out now (sys.exit(1))
root@ca7dcaaed68f:/usr/src/app# python subscriber.py
DEBUG:root:Sending CONNECT (u0, p0, wr0, wq0, wf0, c0, k60) client_id=b'client_02'
DEBUG:root:Received CONNACK (1, 0)
DEBUG:root:Connected
Connected with result code 0
DEBUG:root:Sending SUBSCRIBE (d0) [(b'#', 2)]
DEBUG:root:Received PUBREL (Mid: 2)
DEBUG:root:Received PUBREL (Mid: 5)
DEBUG:root:Received PUBREL (Mid: 8)
DEBUG:root:Received SUBACK
DEBUG:root:Received PUBLISH (d0, q2, r0, m9), 'paho/stacko', ...  (11 bytes)
DEBUG:root:Sending PUBREC (Mid: 9)
DEBUG:root:Received PUBREL (Mid: 9)
DEBUG:root:Message!!!! b'message: 10'
paho/stacko b'message: 10' mid:9
Num: 10
DEBUG:root:Sending PUBCOMP (Mid: 9)
DEBUG:root:Received PUBREL (Mid: 5)
DEBUG:root:Received PUBLISH (d0, q2, r0, m10), 'paho/stacko', ...  (11 bytes)
DEBUG:root:Sending PUBREC (Mid: 10)
DEBUG:root:Received PUBREL (Mid: 10)
DEBUG:root:Message!!!! b'message: 11'
paho/stacko b'message: 11' mid:10
Num: 11
DEBUG:root:Sending PUBCOMP (Mid: 10)
DEBUG:root:Received PUBREL (Mid: 2)

结论(目前)

MQTT(至少Mosquitto)在持久性方面确实如预期的那样工作:如果一个客户机重新连接,它可以对自上次连接以来丢失的那些消息进行“补齐”。但是,即使为订阅者和发布者都设置了qos=2,崩溃前的最后一条消息也不会被重新处理


Tags: debugmessagebytesrootpublishpahod0received
1条回答
网友
1楼 · 发布于 2024-05-16 14:01:08

如果您的客户机在处理消息时可能会失败,那么在开始处理消息之前,您应该将消息存储在某个地方(可能是磁盘上或数据库中)。在

然后,可以在客户端重新启动并尝试再次处理时检查此存储。如果您在重新连接(使用固定的客户端_id)之前执行此操作,则在尝试处理失败的消息时,您不必担心新消息被传递。在

编辑:

另外,在代码中更详细地查看,对于QOS2,传递确认的最后一段似乎只在on_message完成之后发送,因此,如果您在on_message中处理消息时崩溃,那么代理应该重新传递该消息。在

https://github.com/eclipse/paho.mqtt.python/blob/e9914a759f9f5b8081d59fd65edfd18d229a399e/src/paho/mqtt/client.py#L2506

相关问题 更多 >