ZeroMQ PUB套接字连接时缓冲所有输出数据

14 投票
6 回答
12332 浏览
提问于 2025-04-17 10:45

我注意到,当一个zeromq的PUB套接字正在连接时,它会把所有要发送的数据都先存起来,比如说:

import zmq
import time
context = zmq.Context()

# create a PUB socket
pub = context.socket (zmq.PUB)
pub.connect("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
    pub.send('a message should not be dropped')

time.sleep(1)

# create a SUB socket
sub = context.socket (zmq.SUB)
sub.bind("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")

time.sleep(1)

# this is the only message we should see in SUB
pub.send('hi')

while True:
    print sub.recv()

如果订阅者在这些消息之后再连接过来,理论上这些消息应该被丢弃,因为PUB在没有人连接的时候应该丢掉消息。但实际上,它并没有丢掉,而是把所有消息都缓存在这里。

a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
hi

正如你所看到的,那些“消息不应该被丢掉”的内容被套接字缓存在这里,一旦连接成功,它就会把这些消息发送到SUB套接字。如果我在PUB套接字上绑定,然后在SUB套接字上连接,这样就能正常工作了。

import zmq
import time
context = zmq.Context()

# create a PUB socket
pub = context.socket (zmq.PUB)
pub.bind("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
    pub.send('a message should not be dropped')

time.sleep(1)

# create a SUB socket
sub = context.socket (zmq.SUB)
sub.connect("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")

time.sleep(1)

# this is the only message we should see in SUB
pub.send('hi')

while True:
    print repr(sub.recv())

而且你只能看到输出

'hi'

这种奇怪的行为会导致问题,因为它在连接的套接字上缓存了所有数据。我有两个服务器,服务器A向服务器B发布数据。

Server A -- publish --> Server B

如果服务器B在线,一切都正常。但如果我先启动服务器A,而不启动服务器B呢?

结果是,服务器A上的连接PUB套接字会一直保存这些数据,内存使用量会越来越高。

这里的问题是,这种行为是个bug还是特性?如果是特性的话,我在哪里能找到提到这种行为的文档?我该如何停止连接的PUB套接字缓存所有数据呢?

谢谢。

6 个回答

1

他们在这个套接字上设置了高水位标记(HWM)选项。

5

我觉得这个行为是zmq_connect()的本质。也就是说:当zmq_connect()成功返回时,连接在概念上已经建立,因此你的发布者(PUB)开始排队消息,而不是丢弃它们

下面这段摘自"ZMQ指南"的内容给了我们一些提示:

理论上,使用ØMQ套接字时,连接的哪一端和绑定的哪一端并不重要。然而,对于发布-订阅(PUB-SUB)套接字来说,如果你绑定了订阅者(SUB)套接字并连接了发布者(PUB)套接字,订阅者可能会收到旧消息,也就是在订阅者启动之前发送的消息。这是绑定和连接工作方式的一个特性。如果可以的话,最好是绑定发布者并连接订阅者。

zmq_connect()的以下部分中有一些提示,如下所示:

与传统套接字的主要区别

一般来说,传统套接字提供的是一个同步接口,可以用于连接导向的可靠字节流(SOCK_STREAM)或无连接的不可靠数据报(SOCK_DGRAM)。相比之下,ØMQ套接字提供的是一个异步消息队列的抽象,具体的排队语义取决于使用的套接字类型。传统套接字传输的是字节流或离散的数据报,而ØMQ套接字传输的是离散的消息。

ØMQ套接字是异步的,这意味着物理连接的建立和拆除、重新连接以及有效传递的时间对用户是透明的,都是由ØMQ自己来管理。此外,当某个对端无法接收消息时,消息可能会被排队。

7

一个套接字是阻塞还是丢弃消息,取决于它的类型,具体可以参考ZMQ::Socket 文档(以下强调部分是我加的):

ZMQ::HWM:获取高水位线

ZMQ::HWM 选项可以获取指定套接字的高水位线。高水位线是一个硬性限制,表示在内存中,0MQ可以为与该套接字通信的每个对端最多排队的消息数量。

如果达到了这个限制,套接字将进入一个特殊状态,根据套接字的类型,0MQ会采取相应的措施,比如阻塞或丢弃发送的消息。具体每种套接字类型采取的措施可以参考 ZMQ::Socket 中的描述。

默认的 ZMQ::HWM 值为零,意味着“没有限制”。

你可以通过查看套接字类型的文档来判断它是会阻塞还是丢弃,查找 ZMQ::HWM 选项的动作,这个动作要么是 Block(阻塞),要么是 Drop(丢弃)。

对于 ZMQ::PUB,它的动作是 Drop,所以如果没有丢弃消息,你应该检查一下 HWM(高水位线)值,并注意这个警告:默认的 ZMQ::HWM 值为零,意味着“没有限制”,这表示它不会进入特殊状态,直到系统内存耗尽(到那时我就不知道它会怎么表现了)。

撰写回答