为zeromq提供持久性。

persizmq的Python项目详细描述


波斯马克

persizmq为zeromq提供持久性。消息在后台接收并存储在磁盘上,然后再进一步 操纵。

目前,我们只支持ZeroMQ订阅服务器。添加对其他类的支持很容易;我们只需要 到目前为止还不需要他们。

用法

订户

持久订户包装ZeroMQ订户。我们将持久性订阅分为两个组件: 在后台侦听消息的线程订阅服务器,以及存储消息的持久性组件 在磁盘上。

线程订阅服务器

线程订阅服务器实现为persizmq.ThreadedSubscriber。您需要指定一个回调 每次收到消息时调用。

为了处理侦听线程中引发的异常,还需要指定on exception callback。

示例:

importtimeimportzmqimportpersizmqcontext=zmq.Context()subscriber=context.socket(zmq.SUB)subscriber.setsockopt_string(zmq.SUBSCRIBE,"")subscriber.connect("ipc:///some-queue.zeromq")defcallback(msg:bytes)->None:print("received a message: {}".format(msg))defon_exception(exception:Exception)->None:print("an exception was raised in the listening thread: {}".format(exception))withpersizmq.ThreadedSubscriber(callback=callback,subscriber=subscriber,on_exception=on_exception):# do something while we are listening on messages...time.sleep(10)

储存

我们为收到的信息提供两种存储模式:

  1. persizmq.PersistentStorage:将消息存储在磁盘上的fifo队列中。
  2. persizmq.PersistentLatestStorage:将最新消息单独存储在磁盘上。

存储组件作为回调直接传递给线程订阅服务器。

示例:

importpathlibimportzmqimportpersizmqcontext=zmq.Context()subscriber=context.socket(zmq.SUB)subscriber.setsockopt_string(zmq.SUBSCRIBE,"")subscriber.connect("ipc:///some-queue.zeromq")persistent_dir=pathlib.Path("/some/dir")storage=persizmq.PersistentStorage(persistent_dir=persistent_dir)defon_exception(exception:Exception)->None:print("an exception was raised in the listening thread: {}".format(exception))withpersizmq.ThreadedSubscriber(callback=storage.add_message,subscriber=subscriber,on_exception=on_exception):msg=storage.front()# non-blockingifmsgisnotNone:print("Received a persistent message: {}".format(msg))storage.pop_front()

过滤

我们还提供了可以链接到线程订阅服务器上的过滤组件。过滤链是 如果您只想保留少量的消息而忽略其余的消息,那么使用起来特别方便。

过滤器在persizmq.filter模块中实现。

示例:

importpathlibimportzmqimportpersizmqimportpersizmq.filtercontext=zmq.Context()subscriber=context.socket(zmq.SUB)subscriber.setsockopt_string(zmq.SUBSCRIBE,"")subscriber.connect("ipc:///some-queue.zeromq")persistent_dir=pathlib.Path("/some/dir")storage=persizmq.PersistentStorage(persistent_dir=persistent_dir)defon_exception(exception:Exception)->None:print("an exception was raised in the listening thread: {}".format(exception))withpersizmq.ThreadedSubscriber(lambdamsg:storage.add_message(persizmq.filter.MaxSize(max_size=1000)(msg)),subscriber=subscriber,on_exception=on_exception):msg=storage.front()# non-blockingifmsgisnotNone:print("Received a persistent message: {}".format(msg))storage.pop_front()

安装

  • 创建虚拟环境:
python3 -m venv venv3
  • 启动:
source venv3/bin/activate
  • 使用pip:
  • 安装persizmq
pip3 install persizmq

开发

  • 查看存储库。
  • 在存储库根目录中,创建虚拟环境:
python3 -m venv venv3
  • 激活虚拟环境:
source venv3/bin/activate
  • 安装开发依赖项:
pip3 install -e .[dev]
  • 我们用毒物测试和包装分发。假设虚拟环境已激活并且 开发依赖项已安装,运行:
tox
  • 我们还提供了一组预提交检查,lint和检查代码的格式。从激活的 具有开发依赖关系的虚拟环境:
./precommit.py
  • 预提交脚本还可以自动格式化代码:
./precommit.py  --overwrite

版本控制

我们跟着Semantic Versioning。版本x.y.z表示:

  • X是主要版本(向后不兼容),
  • y是次要版本(向后兼容),并且
  • z是修补程序版本(向后兼容的错误修复)。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java爬虫获取外部网站搜索结果   java Bluestack未连接到eclipse   java如何从ConstraintViolationException Hibernamte获取数据库字段名   HttpResponse HttpResponse=httpClient引发java运行时错误。执行(httpPost);   Jama中矩阵的java点积和叉积   java有什么方法可以唯一地识别可扩展设备吗?   java我需要用*来写我的名字,但我不断遇到一个错误,我对编码很陌生   java变量是在内部类中访问的。需要被宣布为最终决定。但我不想宣布最终结果   java如何缩短base64图像字符串,Android?   JavaSpringMVC:计划方法不自动触发   图形学习Java 2D API的好资源是什么?   如何在java中对方法进行排队   java JavaFX多行   java Selenium无法在[链接]上找到基于CSS元素的密码字段元素http://www.cartasi.it/gtwpages/index.jsp   Java中的equals()和hashCode()契约   软删除情况下的java Hibernate二级缓存   java为什么这段代码要两次调用这些方法?