为zeromq提供持久性。
persizmq的Python项目详细描述
波斯马克
persizmq为zeromq提供持久性。消息在后台接收并存储在磁盘上,然后再进一步 操纵。
目前,我们只支持ZeroMQ订阅服务器。添加对其他类的支持很容易;我们只需要 到目前为止还不需要他们。
用法
订户
持久订户包装ZeroMQ订户。我们将持久性订阅分为两个组件: 在后台侦听消息的线程订阅服务器,以及存储消息的持久性组件 在磁盘上。
线程订阅服务器
线程订阅服务器实现为persizmq.ThreadedSubscriber。您需要指定一个回调 每次收到消息时调用。
为了处理侦听线程中引发的异常,还需要指定on exception callback。
示例:
importtimeimportzmqimportpersizmqcontext=zmq.Context()subscriber=context.socket(zmq.SUB)subscriber.setsockopt_string(zmq.SUBSCRIBE,"")subscriber.connect("ipc:///some-queue.zeromq")defcallback(msg:bytes)->None:print("received a message: {}".format(msg))defon_exception(exception:Exception)->None:print("an exception was raised in the listening thread: {}".format(exception))withpersizmq.ThreadedSubscriber(callback=callback,subscriber=subscriber,on_exception=on_exception):# do something while we are listening on messages...time.sleep(10)
储存
我们为收到的信息提供两种存储模式:
- persizmq.PersistentStorage:将消息存储在磁盘上的fifo队列中。
- persizmq.PersistentLatestStorage:将最新消息单独存储在磁盘上。
存储组件作为回调直接传递给线程订阅服务器。
示例:
importpathlibimportzmqimportpersizmqcontext=zmq.Context()subscriber=context.socket(zmq.SUB)subscriber.setsockopt_string(zmq.SUBSCRIBE,"")subscriber.connect("ipc:///some-queue.zeromq")persistent_dir=pathlib.Path("/some/dir")storage=persizmq.PersistentStorage(persistent_dir=persistent_dir)defon_exception(exception:Exception)->None:print("an exception was raised in the listening thread: {}".format(exception))withpersizmq.ThreadedSubscriber(callback=storage.add_message,subscriber=subscriber,on_exception=on_exception):msg=storage.front()# non-blockingifmsgisnotNone:print("Received a persistent message: {}".format(msg))storage.pop_front()
过滤
我们还提供了可以链接到线程订阅服务器上的过滤组件。过滤链是 如果您只想保留少量的消息而忽略其余的消息,那么使用起来特别方便。
过滤器在persizmq.filter模块中实现。
示例:
importpathlibimportzmqimportpersizmqimportpersizmq.filtercontext=zmq.Context()subscriber=context.socket(zmq.SUB)subscriber.setsockopt_string(zmq.SUBSCRIBE,"")subscriber.connect("ipc:///some-queue.zeromq")persistent_dir=pathlib.Path("/some/dir")storage=persizmq.PersistentStorage(persistent_dir=persistent_dir)defon_exception(exception:Exception)->None:print("an exception was raised in the listening thread: {}".format(exception))withpersizmq.ThreadedSubscriber(lambdamsg:storage.add_message(persizmq.filter.MaxSize(max_size=1000)(msg)),subscriber=subscriber,on_exception=on_exception):msg=storage.front()# non-blockingifmsgisnotNone:print("Received a persistent message: {}".format(msg))storage.pop_front()
安装
- 创建虚拟环境:
python3 -m venv venv3
- 启动:
source venv3/bin/activate
- 使用pip: 安装persizmq
pip3 install persizmq
开发
- 查看存储库。
- 在存储库根目录中,创建虚拟环境:
python3 -m venv venv3
- 激活虚拟环境:
source venv3/bin/activate
- 安装开发依赖项:
pip3 install -e .[dev]
- 我们用毒物测试和包装分发。假设虚拟环境已激活并且 开发依赖项已安装,运行:
tox
- 我们还提供了一组预提交检查,lint和检查代码的格式。从激活的 具有开发依赖关系的虚拟环境:
./precommit.py
- 预提交脚本还可以自动格式化代码:
./precommit.py --overwrite