为zeromq提供持久性。

persizmq的Python项目详细描述


波斯马克

persizmq为zeromq提供持久性。消息在后台接收并存储在磁盘上,然后再进一步 操纵。

目前,我们只支持ZeroMQ订阅服务器。添加对其他类的支持很容易;我们只需要 到目前为止还不需要他们。

用法

订户

持久订户包装ZeroMQ订户。我们将持久性订阅分为两个组件: 在后台侦听消息的线程订阅服务器,以及存储消息的持久性组件 在磁盘上。

线程订阅服务器

线程订阅服务器实现为persizmq.ThreadedSubscriber。您需要指定一个回调 每次收到消息时调用。

为了处理侦听线程中引发的异常,还需要指定on exception callback。

示例:

importtimeimportzmqimportpersizmqcontext=zmq.Context()subscriber=context.socket(zmq.SUB)subscriber.setsockopt_string(zmq.SUBSCRIBE,"")subscriber.connect("ipc:///some-queue.zeromq")defcallback(msg:bytes)->None:print("received a message: {}".format(msg))defon_exception(exception:Exception)->None:print("an exception was raised in the listening thread: {}".format(exception))withpersizmq.ThreadedSubscriber(callback=callback,subscriber=subscriber,on_exception=on_exception):# do something while we are listening on messages...time.sleep(10)

储存

我们为收到的信息提供两种存储模式:

  1. persizmq.PersistentStorage:将消息存储在磁盘上的fifo队列中。
  2. persizmq.PersistentLatestStorage:将最新消息单独存储在磁盘上。

存储组件作为回调直接传递给线程订阅服务器。

示例:

importpathlibimportzmqimportpersizmqcontext=zmq.Context()subscriber=context.socket(zmq.SUB)subscriber.setsockopt_string(zmq.SUBSCRIBE,"")subscriber.connect("ipc:///some-queue.zeromq")persistent_dir=pathlib.Path("/some/dir")storage=persizmq.PersistentStorage(persistent_dir=persistent_dir)defon_exception(exception:Exception)->None:print("an exception was raised in the listening thread: {}".format(exception))withpersizmq.ThreadedSubscriber(callback=storage.add_message,subscriber=subscriber,on_exception=on_exception):msg=storage.front()# non-blockingifmsgisnotNone:print("Received a persistent message: {}".format(msg))storage.pop_front()

过滤

我们还提供了可以链接到线程订阅服务器上的过滤组件。过滤链是 如果您只想保留少量的消息而忽略其余的消息,那么使用起来特别方便。

过滤器在persizmq.filter模块中实现。

示例:

importpathlibimportzmqimportpersizmqimportpersizmq.filtercontext=zmq.Context()subscriber=context.socket(zmq.SUB)subscriber.setsockopt_string(zmq.SUBSCRIBE,"")subscriber.connect("ipc:///some-queue.zeromq")persistent_dir=pathlib.Path("/some/dir")storage=persizmq.PersistentStorage(persistent_dir=persistent_dir)defon_exception(exception:Exception)->None:print("an exception was raised in the listening thread: {}".format(exception))withpersizmq.ThreadedSubscriber(lambdamsg:storage.add_message(persizmq.filter.MaxSize(max_size=1000)(msg)),subscriber=subscriber,on_exception=on_exception):msg=storage.front()# non-blockingifmsgisnotNone:print("Received a persistent message: {}".format(msg))storage.pop_front()

安装

  • 创建虚拟环境:
python3 -m venv venv3
  • 启动:
source venv3/bin/activate
  • 使用pip:
  • 安装persizmq
pip3 install persizmq

开发

  • 查看存储库。
  • 在存储库根目录中,创建虚拟环境:
python3 -m venv venv3
  • 激活虚拟环境:
source venv3/bin/activate
  • 安装开发依赖项:
pip3 install -e .[dev]
  • 我们用毒物测试和包装分发。假设虚拟环境已激活并且 开发依赖项已安装,运行:
tox
  • 我们还提供了一组预提交检查,lint和检查代码的格式。从激活的 具有开发依赖关系的虚拟环境:
./precommit.py
  • 预提交脚本还可以自动格式化代码:
./precommit.py  --overwrite

版本控制

我们跟着Semantic Versioning。版本x.y.z表示:

  • X是主要版本(向后不兼容),
  • y是次要版本(向后兼容),并且
  • z是修补程序版本(向后兼容的错误修复)。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java对ServiceListener和ServiceTracker调用提供了哪些排序保证?   java找不到方法格式的符号(DateTimeFormatter)?   mysql有没有一种方法可以将TCPDump输出到一个文件中,并用Java对其进行过滤,每5秒钟用新数据覆盖一次该文件?   java如何最好地配置用户上传支持文件的上传位置   java我在Android上使用OData4j,我无法获取实体   JPA实体关系简单示例中的java获取错误   JAVANoClassDefFoundError:安卓。应用程序。用法安卓中的UsageStatsManager   Eclipse中javaoo代码分析   java MethodVisitor抛出类格式错误   java为什么在从ViewModel调用时,改型排队不起作用?   调试小程序Java控制台:删除跟踪消息大小限制   java复杂安卓活动动画   java如何在使用JDOM2解析XML时忽略注释内容   java通过循环创建文本字段   即使在bufferedwriter关闭后也未发现java文件异常   单链表恢复中的java错误