我怎么做一个rx.py公司从stdin这样的流中可以观察到吗?

2024-06-06 23:37:03 发布

您现在位置:Python中文网/ 问答频道 /正文

我正试图了解函数式反应式编程(FRP)的rxpy库,但我已经遇到了一个障碍。我正在编写一个小程序,它期望通过标准输入(sys.stdin)来传输数据。在

因此,我的问题很简单:如何创建一个从stdin异步读取的rx.Observable实例?是否有内置机制从流中创建Observable实例?在


Tags: 实例函数程序标准编程stdinsysrx
2条回答

我今天刚玩过这个

 d = rx.Observable.from_(sys.stdin).subscribe(print)

似乎有效(回波线到标准输出)。from_是{}的别名。 d是可取消订阅的一次性文件。在

我从未使用过RxPy,但我对RxJS有点熟悉。在

RxPy有{a1},您可能会为此目的使用它,但是我倾向于创建一个可观察的工厂。以ObservableCreation.from_array为指导,让我们现在就来试试。(注意:我还没有运行过这段代码,但它应该能让你大有作为)

from rx.observable import Observable, ObservableMeta
from rx.anonymousobservable import AnonymousObservable
from rx.concurrency import current_thread_scheduler

class ObservableFile(Observable, metaclass=ObservableMeta):

    @classmethod
    def from_file(cls, readableFile, scheduler=None):
        scheduler = scheduler or current_thread_scheduler

        def subscribe(observer):
            def action(action1, state=None):
                try:
                    observer.on_next(readableFile.next())
                    action1(action)

                except StopIteration: # EOF
                    observer.on_completed()

            return scheduler.schedule_recursive(action)
        return AnonymousObservable(subscribe)

那就这样用吧:

^{pr2}$

这将在stdin的每一行上创建一个可观察的,直到EOF为止。它是阻塞的,但是有ways around that。它也可以使用不同的调度程序进行调整。在

相关问题 更多 >