ZeroMQ推送套接字导致客户端在无进程监听时无法终止

4 投票
1 回答
1351 浏览
提问于 2025-04-17 11:15

我刚开始接触ZeroMQ,遇到了一个问题,就是我的客户端没有正常结束。具体来说,我有一个客户端可能会在没有接收服务器监听的情况下“推送”数据,这样就导致程序在Python代码执行完后卡住。我猜可能有一些后台线程需要清理——请告诉我该怎么做,或者指向相关文档。

这是相关的代码片段。如果我在没有监听器的情况下运行这个进程,并且“self.push”那行没有被注释掉,进程就会卡住。

def setup(self):
    print self.name, "connect to sockets"
    ctx = self.ctx = zmq.Context()
    self.pull = ctx.socket(zmq.PULL)
    self.pull.connect(self.ventillatorAddress)
    self.push = ctx.socket(zmq.PUSH)
    self.push.connect(self.sinkAddress)
    self.control = ctx.socket(zmq.SUB)
    self.control.connect(self.publisherAddress)
    self.control.setsockopt(zmq.SUBSCRIBE, "") # get every control message
    self.inbox = ctx.socket(zmq.SUB)
    self.inbox.connect(self.distributorAddress)
    self.inbox.setsockopt(zmq.SUBSCRIBE, self.name) # listen only for messages addressed with name
def start(self):
    print self.name,  "push worker is ready signal"
    # listen for "go" signal
    pollcount = 0
    go = False
    while not go:
        #print "send ready for", self.name
        #self.push.send(self.name+" ready")
        print "listen for 'go'"
        msg = self.recvPoll(self.control)
        if msg is None:
            pollcount += 1
            assert pollcount<10
            print "poll timeout", pollcount
            time.sleep(1)
            continue
        pollcount = 0
        print "recv'd", msg
        assert msg=="go!"
        go = True
    print "go signal received"
    pass

如果把那行注释掉(并且没有监听器),进程就能正常完成。我试过context.term()和context.destroy(),但似乎没有什么帮助。

我该怎么清理这个套接字?或者有没有其他的线索?提前谢谢!

1 个回答

5

这很可能是因为ZeroMQ的linger功能。引用一下手册上的内容:

ZMQ_LINGER选项用于设置指定套接字的等待时间。这个等待时间决定了在用zmq_close(3)关闭套接字后,尚未发送给对方的待发送消息会在内存中停留多久,同时也会影响用zmq_term(3)终止套接字上下文的过程。

默认情况下,ZeroMQ会无限期地等待,直到能够发送出那些卡住的消息。

你可以尝试将ZMQ_LINGER套接字选项设置为零或者一个很短的时间(以毫秒为单位)。

撰写回答