与0mq通讯

2024-04-19 06:18:52 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试创建一个系统来收集发送的数据 我有两个小脚本,一个接收器.py,它应该接收数据,发送者应该发送数据,目前我尝试使用1-1连接,但最后我需要多个发送者和一个接收器来处理传入的数据。我尝试使用0mq发布者/订户模式来实现这一点。你知道吗

#receiver.py

def receive():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.SUBSCRIBE, 'Child:') 
    socket.bind('tcp://localhost:5000')
    while True:
        print 'Parent received: %s' % socket.recv()

receive()

#sender.py

def send(data):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.connect('tcp://localhost:5000')
    socket.send('Sender: %i' % data)
    socket.close()
    print "sent"

send(10)

当我开始的时候接收器.py它只是等待数据,在我运行时不接收任何内容发件人.py. 我非常感谢您的建议,实际上我甚至不确定发布者/订阅者是否是适合我的场景的最佳模式(多个传感器通过本地网络将数据发送到一台服务器进行实时处理)。你知道吗


Tags: 数据pysendlocalhostdefcontext模式socket
2条回答

如果您无法使示例正常工作,那么这是一个好迹象,表明您的ZMQ库安装有问题,或者库版本与您使用的绑定不兼容。从那里开始检查,并且总是尝试先逐字地运行一个示例,以确保所有内容至少与引用代码一起工作。你知道吗

但是,我确实看到您的代码中至少有一个问题会导致您永远无法接收消息。你知道吗

在订阅服务器中,您订阅了“Child:”,但在发布服务器中,您从未发送与之匹配的消息。“正确”的方法是发送多帧消息,但为了代码简单,您也可以发送以主题开头的字符串,如下所示:

socket.send('Child: Sender: %i' % data)

或者,您可以更改订阅服务器以适应当前的消息模式:

socket.setsockopt(zmq.SUBSCRIBE, 'Sender:')

最后但并非最不重要的一点是,如果您想订阅发布者可能发送的所有内容,您可以订阅空字符串:

socket.setsockopt(zmq.SUBSCRIBE, '')

。。。最后一个可能适合你的情况。因此,结果代码如下:

#receiver.py

def receive():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.setsockopt(zmq.SUBSCRIBE, '') 
    socket.bind('tcp://localhost:5000')
    while True:
        print 'Parent received: %s' % socket.recv()

receive()

#sender.py

def send(data):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.connect('tcp://localhost:5000')
    socket.send('Sender: %i' % data)
    socket.close()
    print "sent"

send(10)

处理分要点、问题和评论:

  • 在这个场景中,订阅者使用bind()和发布者使用connect()(正如您所做的那样)更合适,ZMQ不关心您仅基于套接字类型使用哪个。你的订阅者是你的“服务器”,你的发布者是你的“客户”,所以SUB应该bind(),PUB应该connect()。你知道吗
  • PUB/SUB对于您的场景来说是一种很好的模式,无论何时通信都是严格单向的,就像您的案例一样。你知道吗

为了理解发生了什么,我重写了它:

import zmq
import threading
import time

def receive():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.bind("tcp://127.0.0.1:5000")
    socket.setsockopt(zmq.SUBSCRIBE, '')
    while True:
        print 'Parent received: %s' % socket.recv()

threading.Thread(target=receive).start()

def send(data):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.connect("tcp://localhost:5000")
    while data:
        socket.send('Sender: %i' % data)
        data -= 1
        time.sleep(1)

输出:

>>> send(10)
Parent received: Sender: 9
Parent received: Sender: 8
Parent received: Sender: 7
Parent received: Sender: 6
Parent received: Sender: 5
Parent received: Sender: 4
Parent received: Sender: 3
Parent received: Sender: 2
Parent received: Sender: 1

重点是:

  1. Why doesn't zeromq work on localhost?中所述,必须使用127.0.0.1绑定localhost
  2. 发布者没有公开名称,所以我们必须从subcriber中删除Child:,或者将其更改为Sender:,或者在send()Sender:中更改Child:(我选择了第一个)
  3. 可能您需要某种事件来说明订阅服务器的连接在发送数据之前处于活动状态(zeromq是一个异步框架),否则您将丢失第一条消息。作为证明,您可以像下面的示例那样更改send(),并且不会丢失任何消息。你知道吗

是的。你知道吗

def send(data):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.connect("tcp://localhost:5000")
    while data:
        time.sleep(1)
        socket.send('Sender: %i' % data)
        data -= 1
    socket.close()

输出:

>>> send(10)
Parent received: Sender: 10
Parent received: Sender: 9
Parent received: Sender: 8
Parent received: Sender: 7
Parent received: Sender: 6
Parent received: Sender: 5
Parent received: Sender: 4
Parent received: Sender: 3
Parent received: Sender: 2
Parent received: Sender: 1

相关问题 更多 >