rxpy有效地合成可观测值

2024-05-29 03:29:55 发布

您现在位置:Python中文网/ 问答频道 /正文

简介: 你好我正在为我的用例探索python rxpy库——在这里,我正在使用反应式编程概念构建一个执行管道。这样我就不用操纵太多的州了。虽然我的解决方案似乎是功能性的,但我在尝试从其他观察值合成新的观察值时遇到了困难

问题是,我合成观测值的方式导致一些昂贵的计算重复两次。对于性能,我确实希望避免触发昂贵的计算

我对反应式编程非常陌生。试图搔搔我的头,浏览互联网资源和参考文献——对我来说似乎有点过于简练,难以理解。请给我一些建议

下面是一个玩具示例,说明了我在做什么:

import rx
from rx import operators as op
from rx.subject import Subject

root = Subject()

foo = root.pipe(
        op.map( lambda x : x + 1 ),
        op.do_action(lambda r: print("foo(x) = %s (expensive)" % str(r)))
    )

bar_foo = foo.pipe(
        op.map( lambda x : x * 2 ),
        op.do_action(lambda r: print("bar(foo(x)) = %s" % str(r)))
    )

bar_foo.pipe(
        op.zip(foo),
        op.map(lambda i: i[0]+i[1]),
        op.do_action(lambda r: print("foo(x) + bar(foo(x)) = %s" % str(r)))
    ).subscribe()


print("-------------")
root.on_next(10)
print("-------------")

输出:

-------------
foo(x) = 11 (expensive)
bar(foo(x)) = 22
foo(x) = 11 (expensive)
foo(x) + bar(foo(x)) = 33
-------------

您可以认为foo()bar()是昂贵且复杂的操作。我首先构建一个可观察的foo。然后组成一个新的可观察的bar_foo,它包含foo。稍后,将两者压缩在一起以计算最终结果foo(x)+bar(foo(x))

问题:

  1. 如何防止foo()对单个输入触发一次以上? 我有很强的理由把foo()bar()分开。另外,我也不想显式地记忆foo()

  2. 任何有在生产中使用rxpy经验的人都可以分享他们的经验。与同等的手工制作(但不可维护)代码相比,使用rxpy会带来更好的性能还是降低速度


Tags: lambdaimportmapfoobaractionrootrx
1条回答
网友
1楼 · 发布于 2024-05-29 03:29:55

foo管道中的昂贵计算之后立即添加op.share()在这里可能很有用。因此,将foo管道更改为:

foo = root.pipe(
        op.map( lambda x : x + 1 ),
        op.do_action(lambda r: print("foo(x) = %s (expensive)" % str(r))),
        op.share() # added to pipeline
    )

将导致:

      -
foo(x) = 11 (expensive)
bar(foo(x)) = 22
foo(x) + bar(foo(x)) = 33
      -

我相信.share()使昂贵操作的发出事件在下游订户之间共享,因此单个昂贵计算的结果可以多次使用

关于你的第二个问题;我也是RxPy新手,所以对经验丰富的用户的答案很感兴趣。到目前为止,我注意到,作为初学者,您可以轻松创建(糟糕的)管道,其中消息和计算会在后台重复.share()似乎在某种程度上减少了这种情况,但不确定背景中发生了什么

相关问题 更多 >

    热门问题