ZeroMQ工作者如何安全“挂断”?
我这周开始使用ZeroMQ,使用请求-响应模式时,我不太确定怎么让一个工作者安全地“挂掉”并关闭他的套接字,而不至于丢失消息,导致发送消息的客户得不到响应。想象一下,一个用Python写的工作者大概是这样的:
import zmq
c = zmq.Context()
s = c.socket(zmq.REP)
s.connect('tcp://127.0.0.1:9999')
while i in range(8):
s.recv()
s.send('reply')
s.close()
我做了一些实验,发现一个在127.0.0.1:9999
的客户,使用类型为zmq.REQ
的套接字,进行公平排队请求时,可能会很不幸地在工作者完成最后一次send()
后,公平排队算法刚好选择了这个工作者,但在它执行接下来的close()
方法之前。在这种情况下,请求似乎是被工作者进程中的ØMQ栈接收并缓冲的,但当close()
方法把与套接字相关的所有东西都丢掉时,请求就会丢失。
那么,工作者怎么才能“安全地”断开连接呢?有没有办法发出信号表示“我不想再接收消息了”,然后 (a) 循环处理在发出信号期间到达的任何最后消息,(b) 生成它们的回复,然后 (c) 执行close()
,确保没有消息被丢弃呢?
编辑:我想我想要进入的原始状态是“半关闭”状态,在这个状态下,不会再接收新的请求——发送者会知道这一点——但返回路径仍然是开放的,这样我可以检查我的输入缓冲区,看看是否有最后一条到达的消息,如果有的话就进行回复。
编辑:为了回应一个好的问题,我修正了描述,使等待消息的数量变为复数,因为可能有很多连接在等待回复。
6 个回答
我也在考虑这个问题。你可能想要实现一个关闭消息,通知客户这个工作者要离开了。然后,你可以让这个工作者在关闭之前再运行一段时间。虽然这不是最理想的办法,但可能还是能用得上。
我觉得你的消息传递架构有问题。你的工作者应该使用REQ套接字来发送工作请求,这样每个工作者只会排队一个工作。然后为了确认工作完成,你可以用另一个REQ请求来同时确认上一个工作的完成并请求新的工作,或者你可以使用第二个控制套接字。
有些人用PUB/SUB来处理控制,这样每个工作者就可以发布确认消息,而主控端则订阅这些消息。
你要记住,使用ZeroMQ时是没有消息队列的,完全没有!只有根据一些设置(比如高水位线和套接字类型)在发送方或接收方缓冲的消息。如果你真的需要消息队列,那你就得写一个代理应用来处理,或者干脆换成AMQP,这样所有的通信都是通过第三方代理来进行的。
你似乎认为你在试图避免一个“简单”的竞争条件,比如在
... = zmq_recv(fd);
do_something();
zmq_send(fd, answer);
/* Let's hope a new request does not arrive just now, please close it quickly! */
zmq_close(fd);
但我觉得问题在于公平排队(轮询)让事情变得更复杂:你的工作者可能已经有几个请求在排队了。如果轮到发送者发送请求,它不会等你的工作者空闲再发送新请求,所以当你调用 zmq_send
的时候,可能已经有其他请求在等待了。
实际上,看起来你可能选择了错误的数据方向。与其让请求池发送请求给你的工作者(即使你不想接收新的请求),你可能更想让你的工作者从请求队列中获取新请求,处理完后再发送答案。
当然,这意味着要使用 XREP
/XREQ
,但我觉得这样做是值得的。