如何使用可观察的RxPY间隔周期性地调用异步协同程序?

2024-04-20 03:30:26 发布

您现在位置:Python中文网/ 问答频道 /正文

我需要创建一个可观察的流,它以固定的间隔发出异步协同程序的结果。在

intervalRead是一个返回可观察值的函数,它以间隔rate和异步协同程序函数fun作为参数,需要在定义的间隔内调用该函数。在

我的第一个方法是用interval factory方法创建一个observable,然后使用map调用协程,使用from_nufuture将其包装在一个observable中,然后获得协程返回的值。在

async def foo():
    await asyncio.sleep(1)
    return 42

def intervalRead(rate, fun) -> Observable:
    loop = asyncio.get_event_loop()
    return rx.interval(rate).pipe(
        map(lambda i: rx.from_future(loop.create_task(fun()))),
    )

async def main():
    obs = intervalRead(5, foo)
    obs.subscribe(
        on_next= lambda item: print(item)
    )

loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()

然而,我得到的输出不是协同程序的结果,而是在指定的时间间隔内从“未来”返回的可观察值

输出:<rx.core.observable.observable.Observable object at 0x033B5650>

我怎么能得到这个观察值返回的实际值呢?我预计42岁

我的第二个方法是创建一个定制的可观察对象:

^{pr2}$

但是,订阅时from_future(task)从不发出值,为什么会发生这种情况?在

但是如果我这样写intervalRead

def intervalRead(rate, fun):
    loop = asyncio.get_event_loop()
    task = loop.create_task(fun())
    return from_future(task)

我得到了预期的结果:42。显然这并不能解决我的问题,但它让我困惑,为什么它在我的第二种方法中不起作用?在

最后,我使用rx.concurrency CurrentThreadScheduler对第三种方法进行了实验,并使用schedule_periodic方法对操作进行了纵向调度。然而,我面临着与第二种方法相同的问题。在

def funWithScheduler(rate, fun):
    loop = asyncio.get_event_loop()
    scheduler = CurrentThreadScheduler()
    subject = rx.subjects.Subject()
    def action(param):
        obs = rx.from_future(loop.create_task(fun())).subscribe(
            on_next= lambda item: subject.on_next(item),
            on_error= lambda e: print(f'error in action {e}'),
            on_completed= lambda: print('action completed')
        )     
        obs.dispose()   
    scheduler.schedule_periodic(rate,action)
    return subject

如果您能深入了解我缺少什么,或者提供其他建议来完成我需要的东西,我将不胜感激。这是我第一个使用asyncio和RxPY的项目,我只在角度项目的上下文中使用RxJS,所以欢迎任何帮助。在


Tags: 方法lambdafromloopasynciotask间隔return
1条回答
网友
1楼 · 发布于 2024-04-20 03:30:26

你的第一个例子几乎奏效了。要使其正常工作,只需进行两项更改:

首先,from_future的结果是一个可观察的,它发出一个单一的项目(未来完成时的价值)。所以map的输出是一个高阶的可观测的(发出可观测的可观测的)。这些子观察值可以通过在map之后使用merge iu all操作符或者使用flat_map代替map来平坦化。在

然后,interval操作符必须在AsyncIO循环上调度它的计时器,默认情况下不是这样:默认的调度程序是TimeoutScheduler,它会生成一个新线程。因此在原始代码中,由于create_task是从另一个线程调用的,因此无法在AsyncIO事件循环上调度该任务。在subscribe调用中使用scheduler参数声明用于整个操作链的默认调度程序。在

以下代码有效(42每5秒打印一次):

import asyncio
import rx
import rx.operators as ops
from rx.scheduler.eventloop import AsyncIOScheduler


async def foo():
    await asyncio.sleep(1)
    return 42


def intervalRead(rate, fun) -> rx.Observable:
    loop = asyncio.get_event_loop()
    return rx.interval(rate).pipe(
        ops.map(lambda i: rx.from_future(loop.create_task(fun()))),
        ops.merge_all()
    )


async def main(loop):
    obs = intervalRead(5, foo)
    obs.subscribe(
        on_next=lambda item: print(item),
        scheduler=AsyncIOScheduler(loop)
    )

loop = asyncio.get_event_loop()
loop.create_task(main(loop))
loop.run_forever()

相关问题 更多 >