零MQ多部分发布/订阅无法工作

7 投票
3 回答
7201 浏览
提问于 2025-04-16 19:00

这是我的脚本。


#!/usr/bin/env python

import traceback
import sys
import zmq
from time import sleep

print "Creating the zmq.Context"
context = zmq.Context()

print "Binding the publisher to the local socket at port 5557"
sender = context.socket(zmq.PUB)
sender.bind("tcp://*:5557")

print "Binding the subscriber to the local socket at port 5557"
receiver = context.socket(zmq.SUB)
receiver.connect("tcp://*:5557")

print "Setting the subscriber option to get only those originating from \"B\""
receiver.setsockopt(zmq.SUBSCRIBE, "B")

print "Waiting a second for the socket to be created."
sleep(1)

print "Sending messages"
for i in range(1,10):
    msg = "msg %d" % (i)
    env = None
    if i % 2 == 0:
        env = ["B", msg]
    else:
        env = ["A", msg]
    print "Sending Message:  ", env
    sender.send_multipart(env)

print "Closing the sender."
sender.close()

failed_attempts = 0
while failed_attempts < 3:
    try:
        print str(receiver.recv_multipart(zmq.NOBLOCK))
    except:
        print traceback.format_exception(*sys.exc_info())
        failed_attempts += 1 

print "Closing the receiver."
receiver.close()

print "Terminating the context."
context.term()

"""
Output:

Creating the zmq.Context
Binding the publisher to the local socket at port 5557
Binding the subscriber to the local socket at port 5557
Setting the subscriber option to get only those originating from "B"
Waiting a second for the socket to be created.
Sending messages
Sending Message:   ['A', 'msg 1']
Sending Message:   ['B', 'msg 2']
Sending Message:   ['A', 'msg 3']
Sending Message:   ['B', 'msg 4']
Sending Message:   ['A', 'msg 5']
Sending Message:   ['B', 'msg 6']
Sending Message:   ['A', 'msg 7']
Sending Message:   ['B', 'msg 8']
Sending Message:   ['A', 'msg 9']
Closing the sender.
['B', 'msg 2']
['B', 'msg 4']
['B', 'msg 6']
['B', 'msg 8']
['Traceback (most recent call last):\n', '  File "./test.py", line 43, in \n    print str(receiver.recv_multipart(zmq.NOBLOCK))\n', '  File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', '  File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', '  File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', '  File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n']
['Traceback (most recent call last):\n', '  File "./test.py", line 43, in \n    print str(receiver.recv_multipart(zmq.NOBLOCK))\n', '  File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', '  File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', '  File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', '  File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n']
['Traceback (most recent call last):\n', '  File "./test.py", line 43, in \n    print str(receiver.recv_multipart(zmq.NOBLOCK))\n', '  File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', '  File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', '  File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', '  File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n']
Closing the receiver.
Terminating the context.
"""

那么,问题是……为什么这段代码不管用呢?

[编辑] 在zeromq邮件列表上得到了一个非常快的回复后,我已经更新了上面的代码。

3 个回答

0

另外,你可以先不断发送一串hello消息,让订阅者在收到消息后给你一个确认回复。等你收到这个确认后,就可以停止发送测试消息,开始发送真正的消息了(文档)。

2

虽然这个话题有点过时,但如果你想要找到除了休眠之外的解决办法,可以使用监视器。

你可以设置一个监视器回调,这样在发生 ZMQ_EVENT_CONNECTED 事件时就会被调用。

想了解更多细节和示例,可以查看这个链接:http://api.zeromq.org/3-3:zmq-ctx-set-monitor

11

来源:Chuck Remes

在创建套接字的过程中(比如绑定、连接、设置选项)和实际发送消息之间,可能需要加一个“休眠”。因为绑定和连接的操作是异步的,所以在你开始发送消息的时候,这些操作可能还没完成。如果这样的话,通过PUB套接字发送的任何消息都会被丢弃,因为zmq_bind()操作在另一个套接字成功连接之前不会创建队列。

另外,举个例子,你在这个情况下不需要创建两个上下文。两个套接字可以在同一个上下文中创建。这样做没坏处,但也不是必须的。

来源:Pieter

在第一章的最后有一个“问题解决者”,可以解释这个问题。

某些类型的套接字(比如ROUTER和PUB)会默默地丢弃那些没有接收者的消息。正如Chuck所说,连接是异步的,大约需要100毫秒。如果你启动两个线程,一边绑定,一边连接,然后立即开始通过这种套接字发送数据,你会丢失大约前100毫秒的数据。

加一个休眠是个简单粗暴的“证明有效”的方法。实际上,你应该以某种方式进行同步,或者(更常见的情况)在正常启动过程中预期会有消息丢失(也就是说,把发布的数据看作是一个没有明确开始和结束的纯广播)。

有关详细信息,请查看天气更新示例、syncpub和syncsub。

撰写回答