如何在完成另一个可观察对象时处理另一个可观察对象?

2024-05-29 04:52:52 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个source可观察的,我订阅了一个logger观察者,用于日志记录。在

我还订阅了source,这样我就可以进行计算了。当我的计算完成后,我完成了source,我想处理logger

             +-------------------+
             |                   |
   +---------+ source observable +--------+
   |         |                   |        |
   |         +-------------------+        |
   |                                      |
   |                                      |
+--v---------------+         +------------v--------+
|                  |         |                     |
|     logger       |         |    computations     |
|    (observer)    |         |    (observable)     |
+-------^----------+         +-----------+---------+
        |                                |
        |                                |
        |        dispose logger          |
        +--------------------------------+
            when computations completed

但是,logger并没有在正确的时间得到处理——通常会出现一到两个额外的滴答声:

MWE公司

^{pr2}$

但我得到:

Traceback (most recent call last):
  File "C:\Program Files (x86)\Python27\lib\site-packages\IPython\core\interactiveshell.py", line 3035, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-54-e8cb1fb583bf>", line 1, in <module>
    assert logged == [0, 1, 2, 3, 4, 5], logged
AssertionError: [0, 1, 2, 3, 4, 5, 6, 7]

7是怎么被记录下来的?我们的计算应该在source发出5之后终止,此时{}将被释放。在

我做错什么了?在


Tags: inselfsource记录linecodeloggerobserver
1条回答
网友
1楼 · 发布于 2024-05-29 04:52:52

这是线程同步问题。interval()运算符启动新线程,以指定的时间间隔调用{}。一旦您处理了订阅,在其他线程检测到该信号并停止工作之前,需要一段时间。一毫秒的时间已经接近了。在

为了记录通过反应链的消息,在该链中插入日志功能更可靠:

logged = []
def logger(x):
    logged.append(x)
    return x

calculated = source \
    .map(logger) \
    .map(lambda x: x**2) \
    .take_while(lambda x: x < 20) \
    .subscribe(print, print)

相关问题 更多 >

    热门问题