ZMQ在PUB-SUB中的延迟(慢订阅者)
我在网上找到很多类似的问题,但都没能帮我解决我的困扰。
我使用的是:
- Linux Ubuntu 14.04
- python 3.4
- zmq : 4.0.4 // pyZMQ 14.3.1
简而言之
在ZMQ的订阅(SUB)套接字中,接收队列不断增长,即使我设置了高水位标记(HWM)。这种情况发生在订阅者的速度比发布者慢的时候。 我该怎么做才能防止这种情况呢?
背景
我在人与计算机交互的领域工作。我们有一个庞大的代码库来控制鼠标光标之类的东西。我想把它“拆分”成几个模块,通过ZMQ进行通信。 我希望延迟尽可能小,但丢失消息并不是那么重要。
另一个有趣的方面是可以在节点之间添加“监视者”。因此,发布/订阅(PUB/SUB)套接字似乎是最合适的选择。
大致是这样的:
+----------+ +-----------+ +------------+
| | PUB | | PUB | |
| Input | +----+------> | Filter | +----+------> | Output |
| | | SUB | | | SUB | |
+----------+ v +-----------+ v +------------+
+-----+ +-----+
|Spy 1| |Spy 2|
+-----+ +-----+
问题
一切正常,除了当我们添加监视者的时候。 如果我们添加一个进行“重负载”操作的监视者,比如用matplotlib进行实时可视化,我们会发现图表的延迟在增加。也就是说:在上面的图中,过滤和输出都很快,没有延迟,但在监视者2上,延迟可能在运行20分钟后达到10分钟(!!)
看起来接收方的队列在不断增长。 我们研究了ZMQ的高水位标记(HWM)功能,试图把它设置得低一些以丢弃旧消息,但没有任何变化。
最小代码
架构:
+------------+ +-------------+
| | PUB | |
| sender | -------------> | receiver |
| | SUB| |
+------------+ +-------------+
接收方是一个慢接收器(在第一个图中充当监视者)
代码:
Sender.py
import time
import zmq
ctx = zmq.Context()
sender = ctx.socket(zmq.PUB)
sender.setsockopt(zmq.SNDBUF, 256)
sender.set_hwm(10)
sender.bind('tcp://127.0.0.1:1500')
print(zmq.zmq_version()) ## 4.0.4
print(zmq.__version__) ## 14.3.1
print(sender.get_hwm()) ## 10
i = 0
while True:
mess = "{} {}".format(i, time.time())
sender.send_string(mess)
print("Send : {}".format(mess))
i+= 1
receiver.py:
import time
import zmq
ctx = zmq.Context()
front_end = ctx.socket(zmq.SUB)
front_end.set_hwm(1)
front_end.setsockopt(zmq.RCVBUF, 8)
front_end.setsockopt_string(zmq.SUBSCRIBE, '')
front_end.connect('tcp://127.0.0.1:1500')
print(zmq.zmq_version()) ## 4.0.4
print(zmq.__version__) ## 14.3.1
print(front_end.get_hwm()) ## 1
while True:
mess = front_end.recv_string()
i, t = mess.split(" ")
mess = "{} {}".format(i, time.time() - float(t))
print("received : {}".format(mess))
time.sleep(1) # slow
我觉得这不是ZMQ发布/订阅的正常行为。 我尝试在接收器、订阅者和两者中设置HWM,但没有任何变化。
我遗漏了什么呢?
补充:
我觉得我在解释问题时没有说清楚。我实现了一个移动鼠标光标的功能。输入是以200Hz的频率通过ZMQ发送的鼠标光标位置(用.sleep( 1.0 / 200 )
),经过一些处理后更新鼠标光标的位置(在我的最小示例中没有这个sleep)。
一切都很顺利,即使我启动了监视者。尽管如此,监视者的延迟还是在增加(因为处理速度慢)。这种延迟在光标的“管道”末尾并没有出现。
我认为问题出在慢速订阅者排队消息上。
在我的示例中,如果我们关闭发送者而让接收者继续运行,消息会继续显示,直到所有(?)提交的消息都被显示。
监视者正在绘制光标的位置以提供反馈,但有这么大的延迟还是很不方便……我只想获取最后发送的消息,这就是我尝试降低HWM的原因。
2 个回答
来自 http://zguide.zeromq.org/page:all#toc50:
当你的套接字(socket)达到它的高水位标(HWM)时,根据套接字的类型,它会选择阻塞(也就是暂停发送)或者丢弃数据。比如,PUB和ROUTER类型的套接字在达到高水位标时会丢弃数据,而其他类型的套接字则会阻塞。在inproc传输中,发送方和接收方共享同样的缓冲区,所以实际的高水位标是双方设置的高水位标之和。
所以SUB类型的套接字并不会真的丢弃旧消息。你可以通过一些技巧使用路由器来实现丢弃订阅者的功能,或者考虑设计一个可以应对慢速元素的方案。使用ZeroMQ的一个好处是,你的核心代码大部分可以保持不变,你可能只需要调整处理套接字的包装部分。
缺少更好的实时设计/验证
ZeroMQ 是一个强大的消息传递层。
也就是说,检查一下它在原始的 while True:
循环中,实际上每秒发送了多少消息。
测量一下。要基于事实设计,而不是凭感觉。
事实很重要。
start_CLK = time.time() # .SET _CLK
time.sleep( 0.001) # .NOP avoid DIV/0!
i = 0 # .SET CTR
while True: # .LOOP
sender.send_string( "{} {}".format( i, time.time() ) ) # .SND ZMQ-PUB
print i / ( time.time() - start_CLK ) # .GUI perf [msg/sec]
i+= 1 # .INC CTR
ZeroMQ 尽力将消息快速传递到下一个环节。
而且它在这方面做得相当不错。
你的 [Filter
] + [Spy1
] + [Output
] + [Spy2
] 的处理流程,从头到尾,要么
- 比 [
Input
] 发送者更快,包括 .send() 和 .recv_string() 的开销
要么
- 成为主要的阻塞因素,导致内部的 PUB/SUB 队列不断增长
这个队列链的问题可以通过另一种架构设计来解决。
需要重新考虑的事情:
对 [
Filter
].send() 的频率进行子采样(交错因子取决于你控制的实时过程的稳定性问题——比如 1 毫秒(顺便说一下,这是操作系统定时器的分辨率,所以用现成的操作系统定时器控制无法进行量子物理实验 :o),双向语音流为 10 毫秒,电视/图形界面流为 50 毫秒,键盘事件流为 300 毫秒等)在线 与 离线 后处理/可视化(你会发现有很重的
matplotlib
处理,通常会有 800 - 1600 - 3600 毫秒的开销,即使是简单的 2D 图形——在决定改变 PUB/SUB-<proc1>-PUB/SUB-<proc2> 处理架构之前,要测量一下(你已经注意到,<spy2> 在增长 <proc2>-PUB 供给和发送开销方面造成了问题)。线程数量与本地核心数量的关系——从本地 IP 看,所有进程都在同一个本地计算机上。此外,每个 ZMQ.Context 使用一个线程,还要考虑 Python GIL 锁定的开销,如果所有线程都是从同一个 Python 解释器实例化的……阻塞会增加。阻塞会造成问题。更好的分布式架构可以改善这些性能方面。不过,先查看 [1] 和 [2]。
注意:将 20 分钟的处理管道延迟(一个实时系统的时间域偏差)称为延迟,这种说法有点过于美化了。