Rx或与curio异步的可观察模式。

async-rx的Python项目详细描述


异步接收

Unix Build StatusCodacy BadgeCoverage StatusScrutinizer Code QualityPyPI VersionPyPI License

Semantic Versioning之后的版本

概述

应用服务器端的“rx”alias“react”alias“the power of observable pattern and his children”的免费实现。在

实施基于:

  • 基于curio框架的异步函数
  • 用于类型检查的用户协议声明
  • 我们的朋友:诗歌,雪花8,黑色,isort,pytest,mypy,sphinx,tox,travis。。。在
  • 一种名贵的味道
  • 很多闭变量,clojure函数,内部函数
  • 好奇心
  • 摇一摇,再摇一点,塔达!在

安装

将此库直接安装到激活的虚拟环境中:

$ pip install async-rx

或将其添加到您的Poetry项目中:

^{pr2}$

关于Python版本的说明:

  • python代码和测试使用python3.8+
  • 键入扩展为我们提供了可选的python<;3.8支持

API和用法

安装后,可以导入包:

$ python
>>> import async_rx
>>> async_rx.__version__

看看documentation和{a10}。在

Function NameDescription
rx_observer(on_next, on_error, on_completed)Return an observer.
rx_observer_from(observer, on_next, …)Build an observer from another one.
rx_collector(initial_value)Create an observer collector.
rx_create(subscribe, ensure_contract, …)Create an observable with specific delayed execution ‘subscribe’.
rx_defer(observable_factory)Create an observable when a subscription occurs.
rx_distinct(observable, frame_size)Create an observable which send distinct event inside a windows of size #frame_size.
rx_empty()Create an empty Observable.
rx_filter(observable, predicate, …)Create an observable which event are filtered by a predicate function.
rx_first(observable)Create an observale which only take the first event and complete.
rx_forward(observable, except_complet, …)Create an observable wich forward event.
rx_from(observable_input)Convert almost anything to an Observable.
rx_last(observable, count)Create an observale which only take #count (or less) last events and complete.
rx_of(*args)Convert arguments into an observable sequence.
rx_range(start, stop, step)Create an observable sequence of range.
rx_skip(observable, count)Create an obervable wich skip #count event on source.
rx_take(observable, count)Create an observable which take only first #count event maximum (could be less).
rx_throw(error)Create an observable wich always call error.
rx_reduce(observable, accumulator, …)Create an observable which reduce source with accumulator and seed value.
rx_count(observable)Create an observable wich counts the emissions on the source and emits result.
rx_max(observable)Create an observable wich returns the maximal item in the source when completes.
rx_min(observable)Create an observable wich returns minimal item in the source when completes.
rx_sum(observable)Create an observable wich return the sum items in the source when completes.
rx_avg(observable)Create an observable wich return the average items in the source when completes.
rx_buffer(observable, buffer_size)Buffer operator.
rx_window(observable, buffer_size)Window operator.
rx_merge(*observables)Flattens multiple Observables together by blending their values into one Observable.
rx_concat(*observables)Concat operator.
rx_zip(*observables)Combine multiple Observables to create an Observable.
rx_amb(*observables)Amb operator.
rx_map(observable, transform, …)Map operator.
rx_merge_map(*observables, transform)Merge map operator.
rx_group_by(observable, key_selector)Group by operator.
rx_sample(observable, duration)Sample operator used to rate-limit the sequence.
rx_throttle(observable, duration)Throttle operator.
rx_delay(observable, duration, buffer_size, …)Delay operator.
rx_debounce(an_observable, duration)Debounce operator.
rx_dict(initial_value)Create an observable on dictionnary.
rx_list(initial_value)Create an observable on list.
rx_repeat(duration, producer)Repeat data.
rx_repeat_series(source, ratio)Repeat a series (delay, value) as an observable for each subscription.
rx_subject(subject_handler)Create a subject.
rx_subject_from(a_subject, subscribe, …)Build a subject from another one by override some function.
rx_subject_replay(buffer_size, subject_handler)Create a replay subject.
rx_subject_behavior(subject_handler)Create a behavior subject.
rx_publish(an_observable, subject_handler, …)Create a Connectable Observable.
rx_publish_replay(an_observable, …)Create a publish_replay.
rx_publish_behavior(an_observable, …)Create a publish_behavior.

短样本

有了这个惊人的观察者:

classObserverCounterCollector:def__init__(self):self.on_next_count=0self.on_completed_count=0self.on_error_count=0self.items:Any=list([])# a bad idea isn't itasyncdefon_next(self,item:Any)->None:"""Process item."""self.items.append(item)self.on_next_count+=1asyncdefon_completed(self)->None:"""Signal completion of this observable."""self.on_completed_count+=1asyncdefon_error(self,err:Any)->None:self.on_error_count+=1

我们将选择奇数:

asyncdef_predicate(item:int)->bool:returnitem%2==0seeker=ObserverCounterCollector()observable=rx_range(start=0,stop=100)# create an observable of [0, 1, ..., 99]sub=awaitrx_filter(observable=observable,predicate=_predicate).subscribe(an_observer=seeker)# filter and subscribesub()# release resource# we have :assertseeker.on_next_count==50assertseeker.on_completed_count==1assertseeker.on_error_count==0assertseeker.items[0:6]==[0,2,4,6,8,10]

你的新产品react/rx想尝尝吗?在

第一个问题:从哪里开始?在

如果你读过这个页面,你可能曾经在google&co上做过大量的搜索,而且可能和我一样松散 在html/js/whatever中的react组件。 我不能给你最好的解释,但是。。。阿姆波夫,如果你想知道史拉格在幕后是怎么做的,你应该记住:

  • 什么是可观察模式(或侦听器、别名回调)
  • 什么是事件发射器(发送事件的东西?)在

好吧,记住这一点:

  • 当观察者订阅某个可观察对象的事件时,他接收到该事件
  • 事件包括“on_next(item)”、“on_completed()”和错误时(错误)”

去看看”协议.py“然后回来:)

你看到Observer/observate/XXXHandler/Subscribe了吗?以及订阅(是的,会要求取消订阅)?

所以rx_from([1, 2, ...])创建一个observable,当观察者订阅列表时,它将按顺序发送列表项。 花点时间看看测试单元:) 你可以进入可观察模块,看到所有的接收来自,接收延迟,接收最后。。。在

但是,什么是奇怪的“主题”?在

它就像一个观察者和一个观察者,它可以将一个观察者的项目多播到几个观察者。 因为它是一个观察者,它可以从某处接收数据。 因为它是可观察的,所以观察者可以订阅它。在

看看主题模块和测试单元,看看什么是重放主题,有趣的不是吗? 请参见函数subject。在

最后为商品:大炮,“ConnectableObservable”别名多播。天啊,他们杀了肯尼!在

它就像一个主题,你可以连接/断开你想要的或自动(通过调用ref_count)。 “连接”意味着主体开始接收可观察对象,因此项目将发送给观察者。在

我希望这能帮你一点忙:)

参考文献

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java JTable无法向新创建的列添加值   java如何调整JEditorPane中编辑区域的大小?   Java通过反射确定未知数组中的数组大小   java Intellij Idea有时无法按其预期的方式构建应用程序   java Swing GUI带有IntelliJ错误“contentPane不能设置为null”从终端编译时   如何将这些通用方法调用从C#转换为Java   在null上找不到java属性或字段“index”   从Java HashMap获取整数值时是否需要调用intValue()方法?   java Android谷歌地图获取相机中的图像块   unix无法捕获JAVA中“who m”命令的输出   java,同时将邮件发送到“收件人”标题“我”中的多个收件人   在java中向链表添加未知数量的节点   无法为Heroku上的discord bot设置java端口   java使用Apache HttpClient进行选项请求   与元素类型“ApplicationName”关联的属性“Application Version”需要java Open quote   Android Studio Java中的两个变量求和