zeromq:如何防止无限等待?
我刚开始接触ZMQ,正在设计一个应用程序,它的工作流程是这样的:
- 多个客户端中的一个(这些客户端有随机的PULL地址)向5555端口的服务器发送请求。
- 服务器一直在等待客户端的请求。当有请求到达时,会为这个特定的请求启动一个工作进程。没错,多个工作进程可以同时存在。
- 当这个进程完成它的任务后,会把结果发送回客户端。
我认为这种PUSH/PULL的架构很适合这个场景。请纠正我如果我错了。
但是我该如何处理以下情况呢?
- 当服务器没有响应时,client_receiver.recv()会无限期等待。
- 客户端可能会发送请求,但之后会立即失败,这样工作进程就会一直卡在server_sender.send()那里。
那么我该如何在PUSH/PULL模型中设置类似超时的机制呢?
编辑:感谢用户938949的建议,我找到了一个可行的解决方案,并分享出来以供后人参考。
4 个回答
如果你只需要等待一个套接字(socket),那么与其创建一个 Poller
,你可以这样做:
if work_receiver.poll(1000, zmq.POLLIN):
print "got message ",work_receiver.recv(zmq.NOBLOCK)
else:
print "error: message timeout"
你可以使用这个方法,如果你的超时时间会根据不同情况而变化,而不是直接设置 work_receiver.RCVTIMEO
。
这是我在参考了用户938949的回答和这个链接后做的一个快速解决方案。如果你有更好的方法,请分享你的答案,我会推荐你的答案。
如果你想要一些持久的解决方案来提高可靠性,可以参考这个链接。
zeromq的3.0版本(目前是测试版)支持在ZMQ_RCVTIMEO和ZMQ_SNDTIMEO中设置超时。你可以查看这个链接了解更多。
服务器
zmq.NOBLOCK确保当没有客户端时,send()不会被阻塞。
import time
import zmq
context = zmq.Context()
ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")
i=0
while True:
i=i+1
time.sleep(0.5)
print ">>sending message ",i
try:
ventilator_send.send(repr(i),zmq.NOBLOCK)
print " succeed"
except:
print " failed"
客户端
poller对象可以监听多个接收套接字(具体可以参考上面提到的“Python Multiprocessing with ZeroMQ”)。我这里只链接了work_receiver。在无限循环中,客户端每隔1000毫秒进行一次轮询。如果在这段时间内没有收到消息,socks对象会返回空。
import time
import zmq
context = zmq.Context()
work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")
poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)
# Loop and accept messages from both channels, acting accordingly
while True:
socks = dict(poller.poll(1000))
if socks:
if socks.get(work_receiver) == zmq.POLLIN:
print "got message ",work_receiver.recv(zmq.NOBLOCK)
else:
print "error: message timeout"
如果你使用的是zeromq版本3.0或更高版本,你可以设置一个叫做RCVTIMEO的选项来控制接收超时:
client_receiver.RCVTIMEO = 1000 # in milliseconds
不过一般来说,你可以使用轮询器来处理:
poller = zmq.Poller()
poller.register(client_receiver, zmq.POLLIN) # POLLIN for recv, POLLOUT for send
而且poller.poll()
这个方法可以设置一个超时时间:
evts = poller.poll(1000) # wait *up to* one second for a message to arrive.
如果没有东西可以接收,evts
会是一个空列表。
你也可以用zmq.POLLOUT
来检查发送是否能成功。
另外,如果你想处理可能出现问题的对等方,可以使用:
worker.send(msg, zmq.NOBLOCK)
这个方法会立即返回,如果发送无法完成,就会抛出一个ZMQError(zmq.EAGAIN)的错误。