Python ZMQ与多进程导致zmq.error.ZMQError: 中断系统调用

5 投票
1 回答
2283 浏览
提问于 2025-04-18 10:59

我有一个Python脚本,里面绑定了几个(比如说5个)ZMQ接收套接字,代码是这样的:

receiver_1 = context.socket(zmq.PULL)
receiver_1.bind("tcp://*:5555")
...
receiver_5 = context.socket(zmq.PULL)
receiver_5.bind("tcp://*:5559")

receivers = [receiver_1, ..., receiver_5]

然后我启动了一些Google计算引擎的实例,它们连接到相应的发送套接字。

我想要并行地从这些套接字中获取数据,所以我尝试用多进程池来实现。代码大概是这样的:

def recv_result(i):
    result_str = receivers[i].recv()
    return cPickle.loads(result_str)

pool = multiprocessing.Pool()
while True:
    results = pool.map(recv_result, [i for i in range(len(receivers))])
    # break when all results have been received
    ...

但是当我运行我的脚本时,出现的错误是这样的:

Traceback (most recent call last):
  ...
  File ...
    results = pool.map(recv_result, [i for i in range(len(receivers))])
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 227, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 528, in get
    raise self._value
zmq.error.ZMQError: Interrupted system call

我也尝试过用multiprocessing.Process来实现同样的功能,但得到的错误基本上是一样的,只是看起来更复杂。

我想做的是更高效地接收来自我的GCE实例的所有结果,因为我发现这部分是我脚本的瓶颈(在我现在的实现中,我只有一个接收套接字,它是串行地接收来自所有GCE实例的结果)。如果有人能帮我找出我当前代码中的错误,或者有什么更好的方法来实现我的目标,我将非常感激!

1 个回答

2

这里有几个小建议:

  • 很高兴你使用了ZeroMQ——它能帮你做很多事情,而且不需要写太多代码。
  • 不要过度优化。对于ZeroMQ的通信,使用多进程或多线程并不会带来什么好处,因为它本身就非常快,能够处理大量消息。
  • 如果你使用多线程或多进程,千万不要共享zmq上下文,每个线程都要有自己的上下文,否则会出问题。这可能就是你遇到异常的原因。
  • 如果你现在的代码使用的是阻塞的ZeroMQ发送和接收,建议改成非阻塞的。可以看看如何使用轮询。

撰写回答