精心设计的库,以响应式的方式处理连续的数据流,具有发布/订阅和代理功能。

broqer的Python项目详细描述


https://img.shields.io/pypi/v/broqer.svghttps://img.shields.io/travis/semiversus/python-broqer/master.svghttps://readthedocs.org/projects/python-broqer/badge/?version=latesthttps://codecov.io/gh/semiversus/python-broqer/branch/master/graph/badge.svghttps://img.shields.io/github/license/semiversus/python-broqer.svg

最初对嵌入式系统的关注broqer可以用于需要处理连续数据流的任何地方,而且它们无处不在。小心!

https://cdn.rawgit.com/semiversus/python-broqer/7beb7379/docs/logo.svg

概要

  • 无依赖关系的纯python实现
  • 根据麻省理工学院许可证(2018 Günther Jena)
  • 源位于GitHub.com
  • 文档位于ReadTheDocs.com
  • 在python 3.5、3.6、3.7和3.8-dev上测试
  • pytest测试单元,用Flake8检查编码样式,用mypy检查静态类型,用Pylint检查静态代码,用Sphinx记录
  • ReactiveX和其他流式处理框架(如MapCombineLatest,…)知道的运算符
  • 代理功能通过Hub
    • 集中对象以跟踪发布者和订阅者
    • 使用微服务体系结构构建应用程序的起点

展示

在其他框架中,publisher有时称为oberservable。a订户 能够观察发布服务器发出的更改。有了这些基础知识 能够使用观察者模式-让我们看看!

观察者模式

订阅发布服务器是通过|运算符完成的,这里用作管道。 一个简单的订阅服务器是op.Sink,它正在调用一个具有可选位置的函数 和关键字参数。

>>>frombroqerimportValue,op>>>a=Value(5)# create a value (publisher and subscriber with state)>>>disposable=a|op.Sink(print,'Change:')# subscribe a callbackChange:5>>>a.emit(3)# change the valueChange:3>>>disposable.dispose()# unsubscribe

将发布服务器与算术运算符结合

通过将两个发布者与 常用的运算符(如+><<,…)。

>>>frombroqerimportValue,op>>>a=Value(1)>>>b=Value(3)>>>c=a*3>b# create a new publisher via operator overloading>>>c|op.Sink(print,'c:')c:False>>>a.emit(1)# will not change the state of c>>>a.emit(2)c:True

也可以通过索引或键获取项目:

>>>i=Value('a')>>>d=Value({'a':100,'b':200,'c':300})>>>d[i]|op.Sink(print,'r:')r:100>>>i.emit('c')r:300>>>d.emit({'c':123})r:123

一些python内置函数无法返回发布服务器(例如len()需要 返回一个整数)。在这种情况下,在broqer中定义了特殊函数:StrIntFloatLenIn(用于x in y)。其他功能 为了方便起见,可以使用:AllAnyBitwiseAndBitwiseOr

对发布服务器的属性访问正在生成一个发布服务器,其中 访问是在发出值时完成的。出版商必须知道,它应该是哪种类型 模拟-这是通过.inherit_type(type)完成的。

>>>i=Value('Attribute access made REACTIVE')>>>i.inherit_type(str)>>>i.lower().split(sep=' ')|op.Sink(print)['attribute','access','made','reactive']>>>i.emit('Reactive and pythonic')['reactive','and','pythonic']

异步支持

很多操作符都是为异步操作而设计的。你可以放松 节流阀发出(通过op.Debounceop.Throttle),采样和延迟 (通过op.Sampleop.Delay)或启动协同程序,并在完成 结果将被发出。

>>>asyncdeflong_running_coro(value):...awaitasyncio.sleep(3)...returnvalue+1...>>>a=Value(0)>>>a|op.MapAsync(long_running_coro)|op.Sink(print,'Result:')

3秒钟后,结果将是:

Result: 0

MapAsync支持各种模式,在协同路由时如何处理新的emit 正在运行。默认值是一个并发的协同程序运行,但也包括各种队列 或中断模式可用。

每个出版商都可以在联程中等待:

awaitsignal_publisher

函数装饰器

使用函数装饰器动态创建自己的运算符。装饰师是 可用于AccumulateCombineLatestFilterMapMapAsyncMapThreadedReduceSink

>>>@build_map...defcount_vowels(s):...returnsum([s.count(v)forvin'aeiou'])>>>msg=Value('Hello World!)>>>msg|count_vowels()|Sink(print,'Number of vowels:')Numberofvowels:3>>>msg.emit('Wahuuu')Numberofvowels:4

您甚至可以设置Maps和Filters:

>>>importre>>>@build_filter...deffilter_pattern(pattern,s):...returnre.search(pattern,s)isnotNone>>>msg=Value('Cars passed: 135!')>>>msg|filter_pattern('[0-9]*')|Sink(print)Carspassed:135!>>>msg.emit('No cars have passed')>>>msg.emit('Only 1 car has passed')Only1carhaspassed

安装

pip install broqer

学分

布罗克的灵感来自:

  • RxPY:python的反应式扩展(通过b_rge lanes和dag brattli)
  • aioreactive:python的异步/等待反应式工具(由dag brattli提供)
  • streamz:构建管道来管理连续的数据流(由matthew rocklin撰写)
  • MQTT:M2M连接协议
  • 弗洛里安·弗尔斯坦:花几个小时的时间讨论,想出好主意,帮助我理解这些概念!

API

出版商

aPublisher是消息的源。

使用asyncio事件循环:

Publisher ()Basic publisher
StatefulPublisher (init)Publisher keeping an internal state
FromPolling (interval, func, …)Call ^{tt34}$ periodically and emit the returned values

歌剧院TOR

Accumulate (func, init)Apply ^{tt35}$ which is returning new state and value to emit
Cache (*init)Caching the emitted values to access it via ^{tt36}$ property
CatchException (*exceptions)Catching exceptions of following operators in the pipelines
CombineLatest (*publishers)Combine the latest emit of multiple publishers and emit the combination
Filter (predicate, …)Filters values based on a ^{tt37}$ function
Map (map_func, *args, **kwargs)Apply ^{tt38}$ to each emitted value
Merge (*publishers)Merge emits of multiple publishers into one stream
Partition (size)Group ^{tt39}$ emits into one emit as tuple
Reduce (func, init)Apply ^{tt40}$ to the current emitted value and the last result of ^{tt40}$
Replace (value)Replace each received value by the given value
SlidingWindow (size, …)Group ^{tt39}$ emitted values overlapping
Switch (mapping)Emit selected source mapped by ^{tt43}$

使用asyncio事件循环:

Debounce (duetime, *reset_value)Emit a value only after a given idle time (emits meanwhile are skipped)
Delay (delay)Emit every value delayed by the given time
MapAsync (map_coro, mode, …)Apply ^{tt45}$ to each emitted value allowing async processing
MapThreaded (map_func, mode, …)Apply ^{tt46}$ to each emitted value allowing threaded processing
Sample (interval)Emit the last received value periodically
Throttle (duration)Rate limit emits by the given time

订户

aSubscriber是消息的接收器。

Sink (func, *args, **kwargs)Apply ^{tt47}$ to each emitted value
SinkAsync (coro, …)Start ^{tt48}$ like MapAsync
OnEmitFuture (timeout=None)Build a future able to await for
hub.utils.TopicMapper (d)Update a dictionary with changes from topics
Trace (d)Debug output for publishers

受试者

Subject ()Source with ^{tt49}$ method to publish a new message
Value (*init)Source with a state (initialized via ^{tt50}$)

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java如何为ConcurrentHashMap使用并设置适当的并发级别?   java泛型方法,运行时错误,   java在页面上显示加载的图像   java Paypal定期直接支付问题   java如何延迟重新绘制组件   JavaSpringBoot+Hibernate如何维护@Transient字段   java在其方法中获取关于类的信息   在java中将别名添加到枚举   java如何解决向google报告成绩时“需要重新连接客户端”的问题   清晰的java图像背景   java未找到适合JDateChooser的构造函数(字符串、字符串、字符)   java LRU缓存实现。某些测试用例的代码失败   if语句Java嵌套的if/Else条件   java JSoup“wrap”并非每次都按预期工作   Java Spring引导循环依赖于一个环境   ssl证书无法通过Java和IntelliJ连接到SOAP服务   带整数验证的Java扫描器   java在Flex中呈现具有动态列的datagrid   java Android:通过用户选择的选项将文件上载到服务器   子类中的java抛出错误、异常和运行时异常