asyncio:从execu中的异步函数收集结果

2024-04-16 04:13:10 发布

您现在位置:Python中文网/ 问答频道 /正文

我想启动大量的HTTP请求并收集它们的结果,一旦它们全部返回。使用asyncio可以以非阻塞的方式发送请求,但是我在收集结果时遇到了问题。在

我知道有一些解决方案,如aiohttp是针对这个特定问题而制定的。但是HTTP请求只是一个例子,我的问题是如何正确使用asyncio。在

在服务器端,我有一个flask,它用“Hello World!”,但它等待0.1秒后才回答。在我所有的例子中,我发送了10个请求。同步代码大约需要1秒,异步版本可以在0.1秒内完成。在

在客户端,我希望同时启动多个请求并收集它们的结果。我试着用三种不同的方法来做这件事。因为asyncio需要一个执行器来处理阻塞代码,所以所有的方法都调用loop.run_in_executor。在

此代码在他们之间共享:

import requests
from time import perf_counter
import asyncio

loop = asyncio.get_event_loop()

async def request_async():
    r = requests.get("http://127.0.0.1:5000/")
    return r.text

def request_sync():
    r = requests.get("http://127.0.0.1:5000/")
    return r.text

方法1:

在任务列表中使用asyncio.gather(),然后使用run_until_complete。在读取Asyncio.gather vs asyncio.wait之后,gather似乎会等待结果。但是没有,所以这段代码会立即返回,而不需要等待请求完成。 如果我在这里使用一个阻塞函数,这是有效的。为什么我不能使用异步函数?在

^{pr2}$

Python甚至警告我,coroutine "request_async"从未被等待过。 现在,我有一个可行的解决方案:在执行器中使用普通(非异步)函数。但我希望有一个解决方案,它可以与async函数定义一起工作。因为我希望在它们内部使用await(在这个简单的例子中这是不必要的,但是如果我将更多的代码移到asyncio,我相信它会变得很重要)。在

方法2:

Python警告我,我的协同工作永远不会被等待。所以让我们等他们吧。方法2将所有代码封装到一个外部异步函数中,并等待集合的结果。同样的问题,也会立即返回(同样的警告):

# approach 2
async def main():

    tasks = []
    for i in range(10):
        tasks.append(loop.run_in_executor(None, request_async))

    gathered_tasks = asyncio.gather(*tasks)

    return await gathered_tasks # <-------- here I'm waiting on the coroutine 

start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()
print(f"finished {stop - start}")  # 0.0036

这让我很困惑。我正在等待gather的结果。直觉上应该传播到我正在收集的协同程序中。但是python仍然抱怨我的合作计划从未被等待过。在

我又读了一些,发现:How could I use requests in asyncio?

这几乎就是我的例子:组合requests和{}。这让我想到方法3:

方法3:

与方法2的结构相同,但要分别等待分配给run_in_executor()的每个任务(这当然算作等待协同程序):

# approach 3:
# wrapping executor in coroutine
# awaiting every task individually
async def main():

    tasks = []
    for i in range(10):
        task = loop.run_in_executor(None, request_async)
        tasks.append(task)

    responses = []
    for task in tasks:
        response = await task
        responses.append(response)

    return responses

start = perf_counter()
results = loop.run_until_complete(main())
stop = perf_counter()

print(f"finished {stop - start}") # 0.004578

我的问题是:我希望在我的协同程序中有阻塞代码,并与执行器并行运行它们。我如何得到他们的结果?在


Tags: 方法函数run代码inloopasyncioasync
1条回答
网友
1楼 · 发布于 2024-04-16 04:13:10

My question is: I want to have blocking code in my coroutines and run them in parallel with an executor. How do I get their results ?

答案是你的协同程序中不应该有阻塞代码。如果您必须拥有它,则必须使用run_in_executor将其隔离。所以写request_async(使用requests)的唯一正确方法是:

async def request_async():
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, request_sync)

request_async赋予run_in_executor是注定的,因为run_in_executor的整个将在另一个线程中调用sync函数。如果您给它一个协程函数,它会很高兴地调用它(在另一个线程中),并将返回的协程对象作为“result”提供。这相当于将一个生成器传递给一个需要普通函数的代码-是的,它会很好地调用生成器,但它不知道如何处理返回的对象。在

更重要的是,不能仅仅将async放在def前面,然后期望得到一个可用的协同程序。协同程序不能阻塞,除非等待其他异步代码。在

现在,一旦您有了一个可用的request_async,您可以像这样收集它的结果:

^{pr2}$

相关问题 更多 >