如何从像stdin这样的流创建rx.py Observable?
我正在尝试理解 rxpy
这个库,它是用来做函数式反应式编程(FRP)的,但我遇到了一些困难。我正在写一个小程序,期望通过标准输入(sys.stdin
)来接收数据。
所以我的问题很简单:我该如何创建一个 rx.Observable
实例,让它能够异步地从标准输入读取数据?有没有什么内置的方法可以从流中创建 Observable
实例?
2 个回答
3
我今天刚在玩这个,
d = rx.Observable.from_(sys.stdin).subscribe(print)
看起来是可以工作的(会把内容输出到标准输出)。from_
是 from_iterable
的一个别名。d
是一个可丢弃的对象,用来取消订阅。
4
我之前没用过 RxPy
,但对 RxJS
有点了解。
RxPy
有一些 内置的方法,你可以用来实现这个目的,不过我更倾向于自己创建一个 Observable 工厂。我们可以参考 ObservableCreation.from_array
,现在就试试看吧。(注意:我没有运行这段代码,但它应该能帮你大致搞定)
from rx.observable import Observable, ObservableMeta
from rx.anonymousobservable import AnonymousObservable
from rx.concurrency import current_thread_scheduler
class ObservableFile(Observable, metaclass=ObservableMeta):
@classmethod
def from_file(cls, readableFile, scheduler=None):
scheduler = scheduler or current_thread_scheduler
def subscribe(observer):
def action(action1, state=None):
try:
observer.on_next(readableFile.next())
action1(action)
except StopIteration: # EOF
observer.on_completed()
return scheduler.schedule_recursive(action)
return AnonymousObservable(subscribe)
然后就可以这样使用:
res = rx.Observable.from_file(sys.stdin)
这段代码会在标准输入的每一行上创建一个可观察对象,直到遇到文件结束符(EOF)。这个过程是阻塞的,但有一些 方法可以解决这个问题。你也可以用不同的调度器来调整它。