在使用zeromq实现非阻塞发布机制时遇到问题

1 投票
2 回答
2933 浏览
提问于 2025-04-17 05:48

我想用zeromq来让两个进程(发送者和接收者)进行通信。如果接收者进程没有运行,我希望发送者进程可以继续执行,即使消息会丢失也没关系。

我尝试用发布-订阅模式来实现这个功能,但发现如果接收者没有运行,发送者就会卡住。例如,在下面的发送者源代码中:

import zmq

context = zmq.Context()
sender = context.socket(zmq.PUB)
sender.connect("tcp://localhost:5555")

sender.send("Sending to nobody", NOBLOCK)

print "Msg sent"

当接收者没有运行时,"Msg sent"这条消息就不会被打印出来,发送者会一直停在"sender.send('Sending to nobody', NOBLOCK)"这一步。而且,我还尝试通过connect函数来检查接收者是否在线,但无论如何返回的结果都是"None"。

我使用的是Python 2.6.5和zeromq 2.1。

有没有人知道发生了什么,或者有什么替代的解决方案?(我也尝试过使用PULL-PUSH和REQ-REP,但结果类似)

非常感谢!

2 个回答

3

你可以很简单地把 sender.connect("tcp://localhost:5555") 改成 sender.bind("tcp://localhost:5555"),接收端也可以反过来这样做。

这样做的好处是,你可以把接收端放在任何你想要的位置。发送端无论有没有接收端都能正常工作。

3

在你修正了示例之后:

import zmq

context = zmq.Context()
sender = context.socket(zmq.PUB)
sender.connect("tcp://localhost:5555")

sender.send("Sending to nobody", zmq.NOBLOCK)

print "Msg sent"

我看到的情况是“消息已发送”这句话被打印出来了,但脚本在这之后就卡住了,永远也不会退出。问题在于它在 close() 这个系统调用上卡住了。

你可以通过设置套接字的 LINGER 选项来改变这种行为:

sender = context.socket(zmq.PUB)
sender.setsockopt(zmq.LINGER, 100)

这里的值是延迟关闭的时间,单位是毫秒。想了解更多信息,可以查看 zmq_setsockopt 的手册。这个设置的实际效果是,ZMQ 在关闭套接字之前只会等待 延迟 毫秒。

不要把这个值设置得 低,因为那样会导致消息丢失,即使发送方在监听(因为 ZMQ 可能在消息实际送达之前就关闭了套接字)。

撰写回答