Python ThreadPoolExecutor(max_workers=MAX_PARALLEL_REQUESTS) 的 asyncio 对应实现
当我使用ThreadPoolExecutor时,我可以像这样发送一批请求,并限制并发请求的数量:
with ThreadPoolExecutor(max_workers=MAX_PARALLEL_REQUESTS) as pool:
results = list(pool.map(request_func, requests_input_data))
那么我该如何用asyncio来实现这个功能呢?有没有现成的库可以用,还是说我需要自己写代码,比如“等第一个请求完成后再添加一个新请求”?
1 个回答
2
Python的asyncio有一个叫做run_in_executor
的功能,它可以在一个线程池中运行同步代码,这样你就可以得到完全相同的效果。
如果你想添加新的异步任务,并且限制正在运行的任务数量,那你就得自己写代码,可能需要用到asyncio.Semaphore和wait
这个功能。
其实写出一个能用的版本并不难,你可以根据需要不断改进接口,比如返回异常、忽略错误或抛出错误,超时后返回部分结果等等……
如果想要一个在返回之前能运行所有任务的版本,可以用更简短的代码来实现。下面的类大部分是模板代码,真正的核心逻辑只有results
方法里的四行。
import asyncio
from collections import deque
class AsyncExecutor:
"""Automatic Async Task manager that will limit the number of concurrent tasks started"""
def __init__(self, max_workers=5, debug=False):
self.loop = asyncio.get_running_loop()
self.max_workers = max_workers
self.pending_tasks = deque()
self.tasks = set()
self.debug = debug
def submit(self, coro, args=(), kwargs=None):
if not kwargs: kwargs = {}
if len(self.tasks) < self.max_workers:
self.tasks.add(self.loop.create_task(coro(*args, **kwargs)))
else:
self.pending_tasks.append((coro, args, kwargs))
def map(self, coro, args_collection=()):
for args in args_collection:
self.submit(coro, args)
async def results(self):
results = []
while self.pending_tasks or self.tasks:
if self.debug:
print(f"running tasks: {len(self.tasks)}, waiting tasks: {len(self.pending_tasks)}")
done, in_process = await asyncio.wait(self.tasks, return_when=asyncio.FIRST_COMPLETED)
self.tasks = in_process
qtd_new_tasks = max(0, self.max_workers - len(in_process))
for i in range(qtd_new_tasks):
if not self.pending_tasks:
break
coro, args, kwargs = self.pending_tasks.popleft()
self.tasks.add(self.loop.create_task(coro(*args, **kwargs)))
results.extend(task.result() for task in done)
return results
async def test_task(i):
await asyncio.sleep(1)
return i
async def main():
ex = AsyncExecutor(3, debug=True)
ex.map(test_task, [(i,) for i in range(10)])
print(await ex.results())
asyncio.run(main())
注意,这段代码会避免直接创建任务,而是保持协程函数和它的参数,这样做是为了防止在创建任务时(把它作为对象放在“.pending_tasks”里),asyncio循环会自动处理这些待处理的任务。每当异步代码遇到await
时,它会检查所有准备好的任务。在“现实生活”中,这些任务可能会启动一个HTTP API请求或者SQL查询,而目标服务器可能会因为请求过多而崩溃,尽管我们小心地只选择“max_workers”数量的结果。
更简单的做法确实是使用asyncio信号量(就像我之前提到的,虽然在这段代码中没有用到)——但要在任务代码内部使用。
举个例子:
max_workers = 3
semaphore = asyncio.Semaphore(max_workers)
async def test_task(i):
async with semaphore:
# here, instead of `asyncio.sleep` we'd do an async HTTP request to a server
await asyncio.sleep(1)
return i
如果有很多这样的协程代表一个任务,可以使用装饰器来自动限制同时启动的任务数量,确保它们不会同时针对同一个I/O资源。