(py)zmq/PUB:可以立即调用connect()再send()而不丢失消息吗?

3 投票
3 回答
2461 浏览
提问于 2025-04-18 15:40

这段代码让我总是收不到消息:

def publish(frontend_url, message):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.connect(frontend_url)
    socket.send(message)

不过,如果我在代码中加一个短暂的 sleep(),我就能收到消息:

def publish(frontend_url, message):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.connect(frontend_url)
    time.sleep(0.1)  # wait for the connection to be established 
    socket.send(message)

有没有办法确保消息在不需要 sleep() 的情况下就能送达呢?

我担心我无法预测 sleep 的时间(比如网络延迟等等)。

更新:

背景:我想把数据更新从一个 Flask 的 REST 应用发布到一个消息中介(比如在资源创建、更新或删除时)。

目前,我的消息中介是用 0mq 的 FORWARDER 设备搭建的。

我知道 0mq 是为了简化 TCP 套接字和消息传递的复杂性而设计的。

在连接可以长时间保持的情况下,我可以使用它。但是,当我在像 gunicorn 或 uwsgi 这样的应用容器中运行我的 Flask 应用时,我有 N 个工作进程,我不能指望连接或进程能长时间保持。

根据我的理解,我应该使用一个真正的消息中介(比如 RabbitMQ),并使用一个同步客户端把消息发布到那里。

3 个回答

-4

这里有很多帖子开头都是这样的:

“我用了.PUB/.SUB,但它没有达到我想要的效果……有没有人能帮我让它像我想的那样工作?”

这种方法在现实中并不好用,尤其是在分布式系统设计中,尤其是在需要近实时调度和/或紧张资源管理的情况下,这些都是无法避免的。

进程间/平台间的消息传递并不是“仅仅”一行简单代码

# A sample demo-code snippet          # Issues the demo-code has left to be resolved
#------------------------------------ #------------------------------------------------
def publish( frontend_url, message ): # what is a benefit of a per-call OneStopPUBLISH function?
    context = zmq.Context()           # .Context() has to be .Terminate()-d (!)
    socket = context.socket(zmq.PUB)  # is this indeed "a disposable" for each call?
    socket.connect(frontend_url)      # what transport-class used for .connect()/.bind()?
    time.sleep(0.1)                   # wait for the connection to be established 
    socket.send(message)              # ^ has no control over low-level "connection" handshaking

任何人都可以写几行简单的代码,花点时间(自己做或找社区帮忙)让它最终工作(至少能勉强用)。

但是,这个领域的潜力巨大,因此需要调整一下思维,才能充分发挥它的潜力。

如果你对一个好的解决方案有需求,但基础不对或者对代码的理解有误(无论是复制粘贴还是其他),通常不会得到什么合理的结果,尤其是对未来的规划。

消息传递引入了一种新的范式——一个新的宏观体系——在更大范围内构建自动化,令人惊讶的是,你的(确定性)代码变成了一个更复杂的有限状态自动机(FSA)的一部分,这些自动机之间会“交流”。

为此,需要一些[本地资源管理],一些“外部”[传输],以及一些“正式的行为模型礼仪”(避免互相打扰)[通信原语]。

这些通常是ZeroMQ、nanomsg和其他库内置的。

然而,有两个重要的事情是隐藏的。

  • 微观世界中事物是如何内部运作的(许多,甚至所有,试图调整这一点的努力,通常都是浪费时间,而不是尽力去正确使用它)
  • 宏观世界中如何将一群看似简单的元素[通信原语]组织成一个强大、可扩展的消息传递架构,这个架构能够跨越进程/本地主机/网络边界,并满足整体设计需求。

如果不理解这两个世界之间的距离,通常会导致对我们在消息传递库中获得的巨大优势的糟糕利用。

最好的做法就是忘掉那些一行代码调整的方法。这并不高效。

首先理解全局视角,可以帮助你利用最适合你目标的力量。

为什么这么复杂?

enter image description here

( 图片来源:nanomsg.org )

任何非简单的系统都是复杂的。在时间和资源两个方面都如此。尤其是当你努力创建一个稳定、智能、高性能、低延迟、与传输类无关的通用通信框架时。

好消息是,这些已经在微观架构中得到了详细阐述和内置。

坏消息是,这并不能直接满足你的需求(除非是一些非常简单的情况)。

这就是我们要讨论的宏观设计。

你需要设计一个更高层次的算法,让许多孤立的FSA原语能够交流并达成一致,以适应不断发展的多对多对话。是的,库给你提供的“仅仅”是原始的构建块(毫无疑问非常强大)。但让“外部空间”为你的需求服务是你的责任。

这通常是复杂的。

如果这很简单,那它很可能早就被包含在库里面了,不是吗?

接下来该怎么做?

也许接下来的最佳步骤是,朝着更全球化的视角迈进,虽然在用ZeroMQ编程的前几步可能会听起来很复杂,但如果你至少跳到第265页,看看Pieter Hintjens的书《Code Connected, Volume 1》,即使不是逐步阅读,也会有所帮助。

你会开始意识到,如何“编程”FSA原语的宏观体系,从而形成一个更高阶的FSA集合,能够解决所有特定问题。

首先查看图60 重新发布更新图62 HA克隆服务器对,然后再回到基础、元素和细节。

3

来自ZMQ指南

还有一件关于PUB-SUB套接字的重要事情需要了解:你无法确切知道一个订阅者什么时候开始接收消息。即使你先启动了订阅者,等了一会儿再启动发布者,订阅者也总是会错过发布者发送的第一条消息。这是因为在订阅者连接到发布者的过程中(这个过程需要一点时间),发布者可能已经开始发送消息了。

5

你不能完全这样做,但可能有其他解决方案可以解决你的问题。

你为什么要使用 PUB/SUB 套接字呢?其实,发布/订阅的方式更适合长时间运行的连接。通常情况下,你会在 PUB 套接字上进行绑定,而在 SUB 套接字上进行连接。你现在的做法是启动一个套接字来发送一条消息,可能是给某种“服务器”,这其实不太符合 PUB/SUB 的使用方式。

如果你选择使用 REQDEALER 这样的变体,配合 REPROUTER,那么事情可能会更顺利。REQ 套接字会在它的配对套接字准备好接收消息之前,先把消息保存起来。如果你不太在乎“服务器”的响应,那你可以直接忽略它。

有没有什么特别的原因让你不直接保持套接字打开,而是每次发送消息时都要重新建立一个新的上下文和套接字?我能想到一些有限的情况可能会需要这样做,但一般来说,保持套接字一直打开会更好。如果你想继续使用 PUB/SUB,那么可以在应用程序启动时就创建套接字,等待一段安全的时间以应对合理的延迟,然后开始发送消息,而不必每次都担心重新连接。如果你打算长时间保持这个套接字而没有新消息,可能还需要使用心跳机制来确保连接保持畅通。

撰写回答