zeromq:如何防止无限等待?

83 投票
4 回答
95149 浏览
提问于 2025-04-17 02:59

我刚开始接触ZMQ,正在设计一个应用程序,它的工作流程是这样的:

  1. 多个客户端中的一个(这些客户端有随机的PULL地址)向5555端口的服务器发送请求。
  2. 服务器一直在等待客户端的请求。当有请求到达时,会为这个特定的请求启动一个工作进程。没错,多个工作进程可以同时存在。
  3. 当这个进程完成它的任务后,会把结果发送回客户端。

我认为这种PUSH/PULL的架构很适合这个场景。请纠正我如果我错了。


但是我该如何处理以下情况呢?

  1. 当服务器没有响应时,client_receiver.recv()会无限期等待。
  2. 客户端可能会发送请求,但之后会立即失败,这样工作进程就会一直卡在server_sender.send()那里。

那么我该如何在PUSH/PULL模型中设置类似超时的机制呢?


编辑:感谢用户938949的建议,我找到了一个可行的解决方案,并分享出来以供后人参考。

4 个回答

13

如果你只需要等待一个套接字(socket),那么与其创建一个 Poller,你可以这样做:

if work_receiver.poll(1000, zmq.POLLIN):
    print "got message ",work_receiver.recv(zmq.NOBLOCK)
else:
    print "error: message timeout"

你可以使用这个方法,如果你的超时时间会根据不同情况而变化,而不是直接设置 work_receiver.RCVTIMEO

19

这是我在参考了用户938949的回答和这个链接后做的一个快速解决方案。如果你有更好的方法,请分享你的答案,我会推荐你的答案

如果你想要一些持久的解决方案来提高可靠性,可以参考这个链接

zeromq的3.0版本(目前是测试版)支持在ZMQ_RCVTIMEO和ZMQ_SNDTIMEO中设置超时。你可以查看这个链接了解更多。

服务器

zmq.NOBLOCK确保当没有客户端时,send()不会被阻塞。

import time
import zmq
context = zmq.Context()

ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")

i=0

while True:
    i=i+1
    time.sleep(0.5)
    print ">>sending message ",i
    try:
        ventilator_send.send(repr(i),zmq.NOBLOCK)
        print "  succeed"
    except:
        print "  failed"

客户端

poller对象可以监听多个接收套接字(具体可以参考上面提到的“Python Multiprocessing with ZeroMQ”)。我这里只链接了work_receiver。在无限循环中,客户端每隔1000毫秒进行一次轮询。如果在这段时间内没有收到消息,socks对象会返回空。

import time
import zmq
context = zmq.Context()

work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")

poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)

# Loop and accept messages from both channels, acting accordingly
while True:
    socks = dict(poller.poll(1000))
    if socks:
        if socks.get(work_receiver) == zmq.POLLIN:
            print "got message ",work_receiver.recv(zmq.NOBLOCK)
    else:
        print "error: message timeout"
88

如果你使用的是zeromq版本3.0或更高版本,你可以设置一个叫做RCVTIMEO的选项来控制接收超时:

client_receiver.RCVTIMEO = 1000 # in milliseconds

不过一般来说,你可以使用轮询器来处理:

poller = zmq.Poller()
poller.register(client_receiver, zmq.POLLIN) # POLLIN for recv, POLLOUT for send

而且poller.poll()这个方法可以设置一个超时时间:

evts = poller.poll(1000) # wait *up to* one second for a message to arrive.

如果没有东西可以接收,evts会是一个空列表。

你也可以用zmq.POLLOUT来检查发送是否能成功。

另外,如果你想处理可能出现问题的对等方,可以使用:

worker.send(msg, zmq.NOBLOCK)

这个方法会立即返回,如果发送无法完成,就会抛出一个ZMQError(zmq.EAGAIN)的错误。

撰写回答