Python ThreadPoolExecutor(max_workers=MAX_PARALLEL_REQUESTS) 的 asyncio 对应实现

0 投票
1 回答
33 浏览
提问于 2025-04-14 16:39

当我使用ThreadPoolExecutor时,我可以像这样发送一批请求,并限制并发请求的数量:

with ThreadPoolExecutor(max_workers=MAX_PARALLEL_REQUESTS) as pool:
    results = list(pool.map(request_func, requests_input_data))

那么我该如何用asyncio来实现这个功能呢?有没有现成的库可以用,还是说我需要自己写代码,比如“等第一个请求完成后再添加一个新请求”?

1 个回答

2

Python的asyncio有一个叫做run_in_executor的功能,它可以在一个线程池中运行同步代码,这样你就可以得到完全相同的效果。

如果你想添加新的异步任务,并且限制正在运行的任务数量,那你就得自己写代码,可能需要用到asyncio.Semaphore和wait这个功能。

其实写出一个能用的版本并不难,你可以根据需要不断改进接口,比如返回异常、忽略错误或抛出错误,超时后返回部分结果等等……

如果想要一个在返回之前能运行所有任务的版本,可以用更简短的代码来实现。下面的类大部分是模板代码,真正的核心逻辑只有results方法里的四行。

import asyncio
from collections import deque

class AsyncExecutor:
    """Automatic Async Task manager that will limit the number of concurrent tasks started"""

    def __init__(self, max_workers=5, debug=False):
        self.loop = asyncio.get_running_loop()
        self.max_workers = max_workers
        self.pending_tasks = deque()
        self.tasks = set()
        self.debug = debug

    def submit(self, coro, args=(), kwargs=None):
        if not kwargs: kwargs = {}
        if len(self.tasks) < self.max_workers:
            self.tasks.add(self.loop.create_task(coro(*args, **kwargs)))
        else:
            self.pending_tasks.append((coro, args, kwargs))

    def map(self, coro, args_collection=()):
        for args in args_collection:
            self.submit(coro, args)

    async def results(self):
        results = []
        while self.pending_tasks or self.tasks:
            if self.debug:
                print(f"running tasks: {len(self.tasks)}, waiting tasks: {len(self.pending_tasks)}")
            done, in_process = await asyncio.wait(self.tasks, return_when=asyncio.FIRST_COMPLETED)
            self.tasks = in_process
            qtd_new_tasks = max(0, self.max_workers - len(in_process))
            for i in range(qtd_new_tasks):
                if not self.pending_tasks:
                    break
                coro, args, kwargs = self.pending_tasks.popleft()
                self.tasks.add(self.loop.create_task(coro(*args, **kwargs)))
            results.extend(task.result() for task in done)
        return results


async def test_task(i):
    await asyncio.sleep(1)
    return i

async def main():
    ex = AsyncExecutor(3, debug=True)
    ex.map(test_task, [(i,) for i in range(10)])
    print(await ex.results())

asyncio.run(main())

注意,这段代码会避免直接创建任务,而是保持协程函数和它的参数,这样做是为了防止在创建任务时(把它作为对象放在“.pending_tasks”里),asyncio循环会自动处理这些待处理的任务。每当异步代码遇到await时,它会检查所有准备好的任务。在“现实生活”中,这些任务可能会启动一个HTTP API请求或者SQL查询,而目标服务器可能会因为请求过多而崩溃,尽管我们小心地只选择“max_workers”数量的结果。

更简单的做法确实是使用asyncio信号量(就像我之前提到的,虽然在这段代码中没有用到)——但要在任务代码内部使用。

举个例子:

max_workers = 3
semaphore = asyncio.Semaphore(max_workers)

async def test_task(i):
    async with semaphore:
        # here, instead of `asyncio.sleep` we'd do an async HTTP request to a server
        await asyncio.sleep(1)
    return i

如果有很多这样的协程代表一个任务,可以使用装饰器来自动限制同时启动的任务数量,确保它们不会同时针对同一个I/O资源。

撰写回答