zeromq 持久化模式

15 投票
3 回答
12020 浏览
提问于 2025-04-16 06:19

谁来管理ZeroMQ中的持久性?

当我们在Python语言中使用ZeroMQ客户端时,有哪些插件或模块可以用来管理持久性?

我想了解使用ZeroMQ的模式。

3 个回答

2

我们需要在处理消息之前,先把从订阅者那里收到的消息保存下来。这些消息是在一个单独的线程中接收的,并且会存储到磁盘上,而保存下来的消息队列则是在主线程中进行操作的。

这个模块可以在这里找到:https://pypi.org/project/persizmq。以下是一些文档内容:

import pathlib

import zmq

import persizmq

context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
subscriber.connect("ipc:///some-queue.zeromq")

persistent_dir = pathlib.Path("/some/dir")
storage = persizmq.PersistentStorage(persistent_dir=persistent_dir)

def on_exception(exception: Exception)->None:
    print("an exception in the listening thread: {}".format(exception))

with persizmq.ThreadedSubscriber(
    callback=storage.add_message, subscriber=subscriber, 
    on_exception=on_exception):

    msg = storage.front()  # non-blocking
    if msg is not None:
        print("Received a persistent message: {}".format(msg))
        storage.pop_front()
3

在应用程序的端,你可以根据需要保存数据。例如,我在node.js中建立了一个数据持久层,它可以和后端的php进行通信,还能通过websockets进行交流。

这个持久化的部分会保存消息一段时间(可以参考这个链接:http://en.wikipedia.org/wiki/Time_to_live),这样可以给客户端一个连接的机会。我使用了内存中的数据结构,但我也考虑过用redis来实现数据的磁盘持久化。

10

据我所知,Zeromq 是没有任何持久化功能的。这不是它的范围内的事情,需要用户自己来处理。就像把消息进行序列化一样。在 C# 中,我使用过 db4o 来实现持久化。通常我会把对象以原始状态保存,然后再把它序列化,最后发送到 ZMQ 的套接字里。顺便说一下,这是用于发布/订阅的配对。

撰写回答