ZeroMQ Pub/Sub action队列中的最后一个元素其他元素

2024-04-24 20:11:36 发布

您现在位置:Python中文网/ 问答频道 /正文

我开始在python中使用zeromq,并使用Publisher/Subscriber引用。但是,我没有找到任何关于如何处理队列中的消息的文档。我希望将最后收到的消息与队列的其他元素不同。在

示例

在publisher.py

import zmq
import random
import time

port = "5556"
topic = "1"

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)

while True:
    messagedata = random.randrange(1,215)
    print "%s %d" % (topic, messagedata)
    socket.send("%s %d" % (topic, messagedata))
    time.sleep(.2)

在订阅服务器.py

^{pr2}$

我只想知道如何创建isLastMessage()文件中描述的isLastMessage()函数。如果在zeromq或解决方法中有什么直接的问题。在


Tags: pyimport消息topictime队列portcontext
3条回答

如果isLastMessage()要标识{}生成的消息流中的最后一条消息,那么这是不可能的,因为没有最后一条消息。publisher.py产生无限量的消息!在

但是,如果publisher.py知道其最后一条“真实”消息,即no while True:,它可以在之后发送一条“我完成”消息。在subscriber.py中识别这一点很简单。在

对不起,我会保留这个问题供参考。我刚找到答案,在文档中有一个NOBLOCK标志,可以添加到接收器。这样,recv命令不会阻塞。从answer的一部分提取的简单解决方法如下:

while True:
    try:
        #check for a message, this will not block
        message = socket.recv(flags=zmq.NOBLOCK)

        #a message has been received
        print "Message received:", message

    except zmq.Again as e:
        print "No message received yet"

至于真正的实现,我们不确定这是最后一次使用标志NOBLOCK并在进入exception块之后的最后一次调用。其含义如下:

^{pr2}$

欢迎来到非阻塞消息/信令的世界

对于任何严肃的分布式系统设计来说,这是一个重要的特性。在

如果您假设一个“最后”消息是通过管道中没有另一个消息,那么Poller()实例可能会帮助您的主事件循环,您可以在考虑管道“空”之前控制“等待”的时间量,而不是用零等待旋转循环破坏IO资源。在

显式信令总是更好的(如果您可以设计远程端行为)

接收方没有任何知识,接收到的“最后一条”消息的上下文是什么(建议从消息发送方广播显式信令),但是这有一个相反的特性,它指示ZeroMQ原型“内部”丢弃所有此类消息,这不是“最后一条”消息,因此减少了接收端的处理以纠正“最后一条”消息的可用性。在

aQuoteStreamMESSAGE.setsockopt( zmq.CONFLATE, 1 )

如果您可能想阅读更多关于ZeroMQ模式和反模式的内容,请不要错过pieterhintjens的精彩著作《代码连接,第1卷》(同样是pdf格式),并且可能希望更广泛地了解使用principally a non-blocking ZeroMQ approach的{a1}

相关问题 更多 >