PythonZeroMQ——一个订阅服务器的多个发布服务器?

2024-05-13 00:33:27 发布

您现在位置:Python中文网/ 问答频道 /正文

我想编写一个python脚本(称之为父脚本),它执行以下操作:

(1)定义多维numpy数组

(2)forks10种不同的python脚本(称它们为子脚本)。它们中的每一个都必须能够在任何时间点(只要它们还活着)从(1)read获取numpy数组的内容。

(3)每个子脚本都将自己完成工作(子脚本不相互共享任何信息)

(4)在任何时间点,父脚本必须能够接受来自其所有子脚本的消息。这些消息将由父解析,并导致(1)中的numpy数组发生更改。


Linux环境中的python中工作时,我应该如何处理这个问题?我想用zeroMQ,让父节点成为一个订户,而子节点都是发布者;这有意义吗,还是有更好的方法?

此外,我如何允许所有连续读取定义的numpy数组的内容?


Tags: numpy脚本信息消息内容read节点定义
3条回答

sub通道不必是要绑定的通道,因此您可以绑定订户,并且每个子通道都可以连接到该通道并发送其消息。在这个特殊的例子中,我认为multiprocessing模块更适合,但是我觉得有必要提到:

import zmq
import threading

# So that you can copy-and-paste this into an interactive session, I'm
# using threading, but obviously that's not what you'd use

# I'm the subscriber that multiple clients are writing to
def parent():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.SUBSCRIBE, 'Child:')
    # Even though I'm the subscriber, I'm allowed to get this party 
    # started with `bind`
    socket.bind('tcp://127.0.0.1:5000')

    # I expect 50 messages
    for i in range(50):
        print 'Parent received: %s' % socket.recv()

# I'm a child publisher
def child(number):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    # And even though I'm the publisher, I can do the connecting rather
    # than the binding
    socket.connect('tcp://127.0.0.1:5000')

    for data in range(5):
        socket.send('Child: %i %i' % (number, data))
    socket.close()

threads = [threading.Thread(target=parent)] + [threading.Thread(target=child, args=(i,)) for i in range(10)]
for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

特别是,文档的Core Messaging Patterns部分讨论了这样一个事实:对于模式,任何一方都可以绑定(而另一方可以连接)。

我认为使用PUSH/PULL套接字更有意义,因为您有一个标准的Ventilator - Workers - Sink场景,只是呼吸机和水槽是相同的过程。

另外,考虑使用multiprocessing模块而不是ZeroMQ。可能会容易一点。

在ZeroMQ中,每个端口只能有一个发布服务器。唯一(丑陋的)解决方法是在不同的端口上启动每个子PUB套接字,并让父进程监听所有这些端口。

但在0MQ上描述的管道模式,用户指南是更好的方法。

相关问题 更多 >