zeromq 持久化模式
谁来管理ZeroMQ中的持久性?
当我们在Python语言中使用ZeroMQ客户端时,有哪些插件或模块可以用来管理持久性?
我想了解使用ZeroMQ的模式。
3 个回答
2
我们需要在处理消息之前,先把从订阅者那里收到的消息保存下来。这些消息是在一个单独的线程中接收的,并且会存储到磁盘上,而保存下来的消息队列则是在主线程中进行操作的。
这个模块可以在这里找到:https://pypi.org/project/persizmq。以下是一些文档内容:
import pathlib
import zmq
import persizmq
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt_string(zmq.SUBSCRIBE, "")
subscriber.connect("ipc:///some-queue.zeromq")
persistent_dir = pathlib.Path("/some/dir")
storage = persizmq.PersistentStorage(persistent_dir=persistent_dir)
def on_exception(exception: Exception)->None:
print("an exception in the listening thread: {}".format(exception))
with persizmq.ThreadedSubscriber(
callback=storage.add_message, subscriber=subscriber,
on_exception=on_exception):
msg = storage.front() # non-blocking
if msg is not None:
print("Received a persistent message: {}".format(msg))
storage.pop_front()
3
在应用程序的端,你可以根据需要保存数据。例如,我在node.js中建立了一个数据持久层,它可以和后端的php进行通信,还能通过websockets进行交流。
这个持久化的部分会保存消息一段时间(可以参考这个链接:http://en.wikipedia.org/wiki/Time_to_live),这样可以给客户端一个连接的机会。我使用了内存中的数据结构,但我也考虑过用redis来实现数据的磁盘持久化。
10
据我所知,Zeromq 是没有任何持久化功能的。这不是它的范围内的事情,需要用户自己来处理。就像把消息进行序列化一样。在 C# 中,我使用过 db4o 来实现持久化。通常我会把对象以原始状态保存,然后再把它序列化,最后发送到 ZMQ 的套接字里。顺便说一下,这是用于发布/订阅的配对。