2024-04-26 08:14:37 发布
网友
Python asynchronous context managers很有用,但它们do not work与asyncio.wait_for Timeouts结合使用。你知道吗
向异步上下文管理器添加超时的最佳方法是什么?
你试过async-timeout吗?只需用async with timeout()包装任何异步代码(包括异步上下文管理器的使用):
async with timeout()
import asyncio from contextlib import asynccontextmanager from async_timeout import timeout @asynccontextmanager async def my_acm(): print('before') yield print('after') async def main(): async with timeout(1): async with my_acm(): await asyncio.sleep(1.5) asyncio.run(main())
如果只想对异步上下文管理器应用超时,可以创建新的上下文管理器,该管理器利用async-timeout:
async-timeout
import asyncio from contextlib import asynccontextmanager from async_timeout import timeout @asynccontextmanager async def my_acm(): print('before') yield print('after') @asynccontextmanager async def my_acm_plus_timeout(time): async with timeout(time): async with my_acm(): yield async def main(): async with my_acm_plus_timeout(1): await asyncio.sleep(1.5) asyncio.run(main())
What is the best way to add a Timeout to an asynchronous context manager?
不能将wait_for应用于异步上下文管理器,但可以将其应用于使用它的协同例程。因此,要向上下文管理器添加超时,请在异步函数中使用它,并对其应用超时。例如:
wait_for
async def download(url, session): async with session.get(url) as resp: return await resp.text() async def print_google(session): try: text = await asyncio.wait_for(download('http://www.google.com', session), 1) except asyncio.TimeoutError: text = None print(text)
你试过async-timeout吗?只需用
async with timeout()
包装任何异步代码(包括异步上下文管理器的使用):如果只想对异步上下文管理器应用超时,可以创建新的上下文管理器,该管理器利用
async-timeout
:不能将
wait_for
应用于异步上下文管理器,但可以将其应用于使用它的协同例程。因此,要向上下文管理器添加超时,请在异步函数中使用它,并对其应用超时。例如:相关问题 更多 >
编程相关推荐