精心设计的库,以响应式的方式处理连续的数据流,具有发布/订阅和代理功能。
broqer的Python项目详细描述
最初对嵌入式系统的关注broqer可以用于需要处理连续数据流的任何地方,而且它们无处不在。小心!
概要
- 无依赖关系的纯python实现
- 根据麻省理工学院许可证(2018 Günther Jena)
- 源位于GitHub.com
- 文档位于ReadTheDocs.com
- 在python 3.5、3.6、3.7和3.8-dev上测试
- 用pytest测试单元,用Flake8检查编码样式,用mypy检查静态类型,用Pylint检查静态代码,用Sphinx记录
- 从ReactiveX和其他流式处理框架(如Map,CombineLatest,…)知道的运算符
- 代理功能通过Hub
- 集中对象以跟踪发布者和订阅者
- 使用微服务体系结构构建应用程序的起点
展示
在其他框架中,publisher有时称为oberservable。a订户 能够观察发布服务器发出的更改。有了这些基础知识 能够使用观察者模式-让我们看看!
观察者模式
订阅发布服务器是通过|运算符完成的,这里用作管道。 一个简单的订阅服务器是op.Sink,它正在调用一个具有可选位置的函数 和关键字参数。
>>>frombroqerimportValue,op>>>a=Value(5)# create a value (publisher and subscriber with state)>>>disposable=a|op.Sink(print,'Change:')# subscribe a callbackChange:5>>>a.emit(3)# change the valueChange:3>>>disposable.dispose()# unsubscribe
将发布服务器与算术运算符结合
通过将两个发布者与 常用的运算符(如+,>,<<,…)。
>>>frombroqerimportValue,op>>>a=Value(1)>>>b=Value(3)>>>c=a*3>b# create a new publisher via operator overloading>>>c|op.Sink(print,'c:')c:False>>>a.emit(1)# will not change the state of c>>>a.emit(2)c:True
也可以通过索引或键获取项目:
>>>i=Value('a')>>>d=Value({'a':100,'b':200,'c':300})>>>d[i]|op.Sink(print,'r:')r:100>>>i.emit('c')r:300>>>d.emit({'c':123})r:123
一些python内置函数无法返回发布服务器(例如len()需要 返回一个整数)。在这种情况下,在broqer中定义了特殊函数:Str, Int、Float、Len和In(用于x in y)。其他功能 为了方便起见,可以使用:All、Any、BitwiseAnd和BitwiseOr。
对发布服务器的属性访问正在生成一个发布服务器,其中 访问是在发出值时完成的。出版商必须知道,它应该是哪种类型 模拟-这是通过.inherit_type(type)完成的。
>>>i=Value('Attribute access made REACTIVE')>>>i.inherit_type(str)>>>i.lower().split(sep=' ')|op.Sink(print)['attribute','access','made','reactive']>>>i.emit('Reactive and pythonic')['reactive','and','pythonic']
异步支持
很多操作符都是为异步操作而设计的。你可以放松 节流阀发出(通过op.Debounce和op.Throttle),采样和延迟 (通过op.Sample和op.Delay)或启动协同程序,并在完成 结果将被发出。
>>>asyncdeflong_running_coro(value):...awaitasyncio.sleep(3)...returnvalue+1...>>>a=Value(0)>>>a|op.MapAsync(long_running_coro)|op.Sink(print,'Result:')
3秒钟后,结果将是:
Result: 0
MapAsync支持各种模式,在协同路由时如何处理新的emit 正在运行。默认值是一个并发的协同程序运行,但也包括各种队列 或中断模式可用。
每个出版商都可以在联程中等待:
awaitsignal_publisher
函数装饰器
使用函数装饰器动态创建自己的运算符。装饰师是 可用于Accumulate,CombineLatest,Filter,Map,MapAsync, MapThreaded、Reduce和Sink。
>>>@build_map...defcount_vowels(s):...returnsum([s.count(v)forvin'aeiou'])>>>msg=Value('Hello World!)>>>msg|count_vowels()|Sink(print,'Number of vowels:')Numberofvowels:3>>>msg.emit('Wahuuu')Numberofvowels:4
您甚至可以设置Maps和Filters:
>>>importre>>>@build_filter...deffilter_pattern(pattern,s):...returnre.search(pattern,s)isnotNone>>>msg=Value('Cars passed: 135!')>>>msg|filter_pattern('[0-9]*')|Sink(print)Carspassed:135!>>>msg.emit('No cars have passed')>>>msg.emit('Only 1 car has passed')Only1carhaspassed
安装
pip install broqer
学分
布罗克的灵感来自:
- RxPY:python的反应式扩展(通过b_rge lanes和dag brattli)
- aioreactive:python的异步/等待反应式工具(由dag brattli提供)
- streamz:构建管道来管理连续的数据流(由matthew rocklin撰写)
- MQTT:M2M连接协议
- 弗洛里安·弗尔斯坦:花几个小时的时间讨论,想出好主意,帮助我理解这些概念!
API
出版商
aPublisher是消息的源。
使用asyncio事件循环:
Publisher () | Basic publisher |
StatefulPublisher (init) | Publisher keeping an internal state |
FromPolling (interval, func, …) | Call ^{tt34}$ periodically and emit the returned values |
歌剧院TOR
Accumulate (func, init) | Apply ^{tt35}$ which is returning new state and value to emit |
Cache (*init) | Caching the emitted values to access it via ^{tt36}$ property |
CatchException (*exceptions) | Catching exceptions of following operators in the pipelines |
CombineLatest (*publishers) | Combine the latest emit of multiple publishers and emit the combination |
Filter (predicate, …) | Filters values based on a ^{tt37}$ function |
Map (map_func, *args, **kwargs) | Apply ^{tt38}$ to each emitted value |
Merge (*publishers) | Merge emits of multiple publishers into one stream |
Partition (size) | Group ^{tt39}$ emits into one emit as tuple |
Reduce (func, init) | Apply ^{tt40}$ to the current emitted value and the last result of ^{tt40}$ |
Replace (value) | Replace each received value by the given value |
SlidingWindow (size, …) | Group ^{tt39}$ emitted values overlapping |
Switch (mapping) | Emit selected source mapped by ^{tt43}$ |
使用asyncio事件循环:
Debounce (duetime, *reset_value) | Emit a value only after a given idle time (emits meanwhile are skipped) |
Delay (delay) | Emit every value delayed by the given time |
MapAsync (map_coro, mode, …) | Apply ^{tt45}$ to each emitted value allowing async processing |
MapThreaded (map_func, mode, …) | Apply ^{tt46}$ to each emitted value allowing threaded processing |
Sample (interval) | Emit the last received value periodically |
Throttle (duration) | Rate limit emits by the given time |
订户
aSubscriber是消息的接收器。
Sink (func, *args, **kwargs) | Apply ^{tt47}$ to each emitted value |
SinkAsync (coro, …) | Start ^{tt48}$ like MapAsync |
OnEmitFuture (timeout=None) | Build a future able to await for |
hub.utils.TopicMapper (d) | Update a dictionary with changes from topics |
Trace (d) | Debug output for publishers |