将asyncfor与if条件相结合以中断mid Wait的正确方法是什么?

2024-05-20 23:04:28 发布

您现在位置:Python中文网/ 问答频道 /正文

如果我有一个使用异步生成器项的协同程序,那么从外部条件终止该循环的“最佳”方法是什么

考虑这个,

while not self.shutdown_event.is_set():
    async with self.external_lib_client as client:
        async for message in client:
            if self.shutdown_event.is_set():
                break
            await self.handle(message)

如果我设置shutdown_event,它将跳出while循环,但直到下一个messageasync for循环处理完毕。什么是构造async for迭代器的正确方法,以便在迭代器之间满足条件时可以短路,从而产生结果

是否有添加Timeout的标准方法


Tags: 方法selfclienteventmessageforasyncis
1条回答
网友
1楼 · 发布于 2024-05-20 23:04:28

一种方法是将迭代移动到async def并使用取消:

async def iterate(client):
    async for message in client:
        # shield() because we want cancelation to cancel retrieval
        # of the next message, not ongoing handling of a message
        await asyncio.shield(self.handle(message))

async with self.external_lib_client as client:
    iter_task = asyncio.create_task(iterate(client))
    shutdown_task = asyncio.create_task(self.shutdown_event.wait())
    await asyncio.wait([iter_task, shutdown_task],
                       return_when=asyncio.FIRST_COMPLETED)
    if iter_task.done():
        # iteration has completed, access result to propagate the
        # exception if one was raised
        iter_task.result()
        shutdown_task.cancel()
    else:
        # shutdown was requested, cancel iteration
        iter_task.cancel()

另一种方法是将shutdown_event转换为一次性异步流,并使用aiostream来监视这两个流。这样for循环在发出关闭事件信号时获得一个对象,并且可以中断循环而不必费心等待下一条消息:

# a stream that just yields something (the return value of `wait()`)
# when shutdown_event is set
done_stream = aiostream.stream.just(self.shutdown_event.wait())

async with self.external_lib_client as client, \
        aiostream.stream.merge(done_stream, client).stream() as stream:
    async for message in stream:
        # the merged stream will provide a bogus value (whatever
        # `shutdown_event.wait()` returned) when the event is set,
        # so check that before using `message`:
        if self.shutdown_event.is_set():
            break
        await self.handle(message)

注意:由于问题中的代码不可运行,以上示例未经测试

相关问题 更多 >