绑定、发布、解绑、重复
peer.py
import zmq
import time
if __name__ == '__main__':
context = zmq.Context()
socket = context.socket(zmq.PUB)
while True:
print "I: Publishing"
socket.bind("tcp://*:5555")
socket.send_multipart(['general', 'Unique peer information'])
socket.unbind("tcp://*:5555")
time.sleep(1)
scanner.py
import zmq
if __name__ == '__main__':
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, 'general')
socket.connect("tcp://localhost:5555")
print "I: Scanning 5555"
while True:
message = socket.recv_multipart()
print "I: Receiving: {}".format(message)
我正在尝试在同一台电脑上通过同一个端口广播多个“节点”,并让一个“扫描器”监听,看看有哪些节点是可用的。每个节点会广播它的联系信息,然后客户端可以使用这个扫描器来找出哪些节点可用,再通过广播的信息建立连接,使用请求/响应的方式。
为了实现这个功能,我试着快速绑定一个发布(PUB)套接字,广播一个节点的信息,然后解除绑定,这样其他节点就可以在不同的时间绑定到同一个套接字,并广播下一个节点的不同标识信息。
我怀疑在解除绑定时,消息可能会被丢弃(调用close()
时也会发生同样的情况),但我不知道如何在关闭连接之前清空队列,以避免丢失任何消息。
有没有什么想法?
- Windows 8.1 x64
- Python 2.7.7 x64
- PyZMQ 4.0.4
编辑
我在ZMQ邮件列表上问了同样的问题,得到了有趣的回复,可以查看这里: http://lists.zeromq.org/pipermail/zeromq-dev/2014-June/026444.html
2 个回答
根据在ZMQ邮件列表上的反馈,我决定采用这个建议,直到有更合适的方案出现。现在的问题是,我只能在任何时候运行一个扫描器。理想情况下,扫描器应该彼此独立,也不依赖于其他的同伴。
大家有什么想法吗?
peer.py
import zmq
import time
import uuid
unique_id = uuid.uuid4().get_urn()
if __name__ == '__main__':
while True:
print "I: Publishing {}".format(unique_id)
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://127.0.0.1:5555")
socket.send_multipart(['general', unique_id])
socket.close()
context.destroy()
# Wait for a random amount of time
time.sleep(1)
scanner.py
import zmq
if __name__ == '__main__'
:
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://*:5555")
print "I: Scanning 5555"
while True:
message = socket.recv_multipart()
print "I: Receiving: {}".format(message)
一个订阅者必须先连接到发布者,才能接收到任何消息。操作的顺序大致如下:
- 启动发布者(“启动”意味着定义一个套接字并进行绑定或连接,对于订阅者来说,就是订阅你想要的消息)。
- 启动订阅者(第1步和第2步可以任意顺序进行)。
- 发送/接收你的消息。
- 关闭所有操作。
PUB
套接字是非阻塞的,它不会等到有订阅者来发送消息,而是直接把消息放进队列里。如果没有订阅者在等着接收,它就会把消息丢掉。你可以查看pyzmq的发布/订阅示例,你会看到订阅者必须先自我声明,发布者才能发送消息并确保消息会被接收。
如果你知道你的订阅者在指定的时间内总是可用,你可以用超时来“伪装”一下,否则发布/订阅就无法正常工作。
实际上,你的PUB
套接字已经绑定了,但订阅者连接需要一些时间,即使你先启动了它,所以在你发送消息时,订阅者可能还没准备好。消息会被丢掉,然后你就关闭所有操作,这时订阅者可能还没完全连接上。
你需要使用另一种类型的套接字对,比如rep/req
,这样可以实现阻塞行为,直到接收者准备好接收消息。这可能意味着使用一个中央服务器来控制哪些对等方可用,而不是试图将信息广播给所有对等方,或者根据你的消息需求和架构选择其他方案。或者,你需要一个更长生命周期的发布/订阅系统。