2024-06-06 23:37:03 发布
网友
我正试图了解函数式反应式编程(FRP)的rxpy库,但我已经遇到了一个障碍。我正在编写一个小程序,它期望通过标准输入(sys.stdin)来传输数据。在
rxpy
sys.stdin
因此,我的问题很简单:如何创建一个从stdin异步读取的rx.Observable实例?是否有内置机制从流中创建Observable实例?在
rx.Observable
Observable
我今天刚玩过这个
d = rx.Observable.from_(sys.stdin).subscribe(print)
似乎有效(回波线到标准输出)。from_是{}的别名。 d是可取消订阅的一次性文件。在
from_
d
我从未使用过RxPy,但我对RxJS有点熟悉。在
RxPy
RxJS
RxPy有{a1},您可能会为此目的使用它,但是我倾向于创建一个可观察的工厂。以ObservableCreation.from_array为指导,让我们现在就来试试。(注意:我还没有运行过这段代码,但它应该能让你大有作为)
ObservableCreation.from_array
from rx.observable import Observable, ObservableMeta from rx.anonymousobservable import AnonymousObservable from rx.concurrency import current_thread_scheduler class ObservableFile(Observable, metaclass=ObservableMeta): @classmethod def from_file(cls, readableFile, scheduler=None): scheduler = scheduler or current_thread_scheduler def subscribe(observer): def action(action1, state=None): try: observer.on_next(readableFile.next()) action1(action) except StopIteration: # EOF observer.on_completed() return scheduler.schedule_recursive(action) return AnonymousObservable(subscribe)
那就这样用吧:
这将在stdin的每一行上创建一个可观察的,直到EOF为止。它是阻塞的,但是有ways around that。它也可以使用不同的调度程序进行调整。在
我今天刚玩过这个
似乎有效(回波线到标准输出)。}的别名。
from_
是{d
是可取消订阅的一次性文件。在我从未使用过
RxPy
,但我对RxJS
有点熟悉。在RxPy
有{a1},您可能会为此目的使用它,但是我倾向于创建一个可观察的工厂。以ObservableCreation.from_array
为指导,让我们现在就来试试。(注意:我还没有运行过这段代码,但它应该能让你大有作为)那就这样用吧:
^{pr2}$这将在stdin的每一行上创建一个可观察的,直到EOF为止。它是阻塞的,但是有ways around that。它也可以使用不同的调度程序进行调整。在
相关问题 更多 >
编程相关推荐