0mq 一对多连接

4 投票
2 回答
4086 浏览
提问于 2025-04-17 00:16

如何用0mq建立进程之间的双向通信是最正确的方式?我需要创建几个后台进程,这些进程会等待主进程的指令,进行一些计算,然后把结果返回给主进程。

2 个回答

3

可以考虑使用请求-回复代理,但把 REQ 插座换成 DEALER。DEALER 在发送时不会阻塞,并且会自动平衡流量到你的工作进程上。

在图中,Client 就是你的 main process,而 Service A/B/C 则是你的 background processes (workers)Main process 应该绑定到一个端点。Workers 则需要连接到主进程的端点,以接收工作任务。

main process 中,保持一个工作任务的列表,并记录发送时间。如果一段时间没有收到回复,就重新发送工作任务,因为 worker 可能已经挂掉了。

请求-回复代理

7

有几种方法可以做到这一点。最简单的方法可能是使用 REQ/REP 套接字。每个后台进程或工作者都会有一个 REP 套接字,而你可以用 REQ 套接字与它们进行通信:

import zmq

def worker(addr):
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind(addr)
    while True:
        # get message from boss
        msg = socket.recv()
        # ...do smth
        # send back results
        socket.send(msg)

if __name__ == '__main__':
    # spawn 5 workers
    from multiprocessing import Process
    for i in range(5):
        Process(target=worker, args=('tcp://127.0.0.1:500%d' % i,)).start()

你需要连接到每个工作者,发送消息,然后获取结果:

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect(worker_addr)
socket.send('message')
msg = socket.recv()

另一种方法是使用 PUB/SUB 来向工作者发送消息,同时使用 PUSH/PULL 来收集结果:

import zmq

def worker(worker_id, publisher_addr, results_addr):
    context = zmq.Context()
    sub = context.socket(zmq.SUB)
    sub.connect(publisher_addr)
    sub.setsockopt(zmq.SUBSCRIBE, worker_id)
    push = context.socket(zmq.PUSH)
    push.connect(results_addr)

    while True:
        msg = sub.recv_multipart()[1]
        # do smth, send off results
        push.send_multipart([worker_id, msg])

if __name__ == '__main__':
    publisher_addr = 'tcp://127.0.0.1:5000'
    results_addr = 'tcp://127.0.0.1:5001'

    # launch some workers into space
    from multiprocessing import Process
    for i in range(5):
        Process(target=worker, args=('worker-%d' % i, publisher_addr, results_addr,)).start()

如果你想向特定的工作者发送命令,可以这样做:

context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind(publisher_addr)
# send message to worker-1
pub.send_multipart(['worker-1', 'hello'])

然后拉取结果:

context = zmq.Context()
pull = context.socket(zmq.PULL)
pull.bind(results_addr)

while True:
    worker_id, result = pull.recv_multipart()
    print worker_id, result

撰写回答