确保ZeroMQ中的工作进程总是正常终止

3 投票
1 回答
699 浏览
提问于 2025-04-16 22:17

我正在用Python的zeroMQ实现一个管道模式。

任务会分发给多个工作者,这些工作者会通过一个无限循环来监听新的任务,代码大概是这样的:

    while True:
        socks = dict(self.poller.poll())
        if self.receiver in socks and socks[self.receiver] == zmq.POLLIN:
            msg = self.receiver.recv_unicode(encoding='utf-8')
            self.process(msg)
        if self.hear in socks and socks[self.hear] == zmq.POLLIN:
            msg = self.hear.recv()
            print self.pid,":",  msg
            sys.exit(0)

当他们收到来自“汇聚节点”的消息,确认已经收到了所有预期的结果时,就会退出。

不过,有时候工作者可能会错过这样的消息,导致没有完成任务。那么,有什么好的方法可以确保工作者总是能完成任务呢?因为除了前面提到的消息,他们没有其他方式知道没有更多的任务需要处理。

这是我写的测试代码,用来检查工作者的状态:

#-*- coding:utf-8 -*-
"""
Test module containing tests for all modules of pypln 

"""
import unittest
from servers.ventilator import Ventilator
from subprocess import Popen, PIPE
import time
class testWorkerModules(unittest.TestCase):
    def setUp(self):
        self.nw = 4
        #spawn 4 workers
        self.ws = [Popen(['python', 'workers/dummy_worker.py'], stdout=None) for i in range(self.nw)]
        #spawn a sink
        self.sink = Popen(['python', 'sinks/dummy_sink.py'], stdout=None)
        #start a ventilator
        self.V = Ventilator()
        # wait for workers and sinks to connect
        time.sleep(1)

    def test_send_unicode(self):
        '''
        Pushing unicode strings through workers to sinks.
        '''

        self.V.push_load([u'são joão' for i in xrange(80)])
        time.sleep(1)
        #[p.wait() for p in self.ws]#wait for the workers to terminate
        wsr = [p.poll() for p in self.ws]
        while None in wsr:
            print wsr, [p.pid for p in self.ws if p.poll() == None] #these are the unfinished workers
            time.sleep(0.5)
            wsr = [p.poll() for p in self.ws]
        self.sink.wait()
        self.sink = self.sink.returncode
        self.assertEqual([0]*self.nw, wsr)
        self.assertEqual(0, self.sink)

if __name__ == '__main__':
    unittest.main()

1 个回答

1

所有的消息传递最终都会涉及到“心跳”这个概念。简单来说,如果你是一个工作者(比如说程序的一部分),发现你需要用的某个组件已经不工作了,你基本上有两个选择:要么尝试去连接其他地方,要么就自己退出。所以,如果你作为工作者发现那个组件不再存在了,就直接退出吧。这也意味着,即使那个组件还在,但连接已经断了,你也可以选择退出。不过我不太确定你还能做些什么,也许可以把所有的超时时间设置得更合理一些……

撰写回答