如何从像stdin这样的流创建rx.py Observable?

5 投票
2 回答
3308 浏览
提问于 2025-04-18 15:01

我正在尝试理解 rxpy 这个库,它是用来做函数式反应式编程(FRP)的,但我遇到了一些困难。我正在写一个小程序,期望通过标准输入(sys.stdin)来接收数据。

所以我的问题很简单:我该如何创建一个 rx.Observable 实例,让它能够异步地从标准输入读取数据?有没有什么内置的方法可以从流中创建 Observable 实例?

2 个回答

3

我今天刚在玩这个,

 d = rx.Observable.from_(sys.stdin).subscribe(print)

看起来是可以工作的(会把内容输出到标准输出)。from_from_iterable 的一个别名。d 是一个可丢弃的对象,用来取消订阅。

4

我之前没用过 RxPy,但对 RxJS 有点了解。

RxPy 有一些 内置的方法,你可以用来实现这个目的,不过我更倾向于自己创建一个 Observable 工厂。我们可以参考 ObservableCreation.from_array,现在就试试看吧。(注意:我没有运行这段代码,但它应该能帮你大致搞定)

from rx.observable import Observable, ObservableMeta
from rx.anonymousobservable import AnonymousObservable
from rx.concurrency import current_thread_scheduler

class ObservableFile(Observable, metaclass=ObservableMeta):

    @classmethod
    def from_file(cls, readableFile, scheduler=None):
        scheduler = scheduler or current_thread_scheduler

        def subscribe(observer):
            def action(action1, state=None):
                try:
                    observer.on_next(readableFile.next())
                    action1(action)

                except StopIteration: # EOF
                    observer.on_completed()

            return scheduler.schedule_recursive(action)
        return AnonymousObservable(subscribe)

然后就可以这样使用:

res = rx.Observable.from_file(sys.stdin)

这段代码会在标准输入的每一行上创建一个可观察对象,直到遇到文件结束符(EOF)。这个过程是阻塞的,但有一些 方法可以解决这个问题。你也可以用不同的调度器来调整它。

撰写回答