异步中循环的并行化

2024-03-28 20:16:08 发布

您现在位置:Python中文网/ 问答频道 /正文

我现在有一个for循环,如下所示

async def process(tasks, num):
      count = 0
      results = []
      for task in tasks:
           if count >= num:
               break
           result = await some_async_task(task)
           if result == 'foo':
               continue
           results.append(result)
           count+=1

我想知道我能不能在这里用“收集”或“等待”。但我不知道如何在那里实现这些if逻辑?就像。。如果count>;=num,我不想不必要地等待任务。 如果有20个任务并且num=4,那么我不想运行全部20个任务。你知道吗


Tags: infortaskasyncifdefcountsome
2条回答

使用aiostream library可以很容易地实现这一点。下面是一个工作示例:

import asyncio
from random import random
from aiostream import stream, pipe


async def some_async_task(i):
    await asyncio.sleep(random())
    return i if random() < 0.2 else None


async def process(task_args, n):
    return await (
        stream.iterate(task_args)
        | pipe.map(some_async_task, task_limit=n)
        | pipe.filter(bool)
        | pipe.take(n)
        | pipe.list()
    )


async def main():
    print(await process(task_args=range(100), n=10))


if __name__ == "__main__":
    asyncio.run(main())

程序将打印成功的前10个任务的列表:

[1, 8, 16, 18, 19, 37, 42, 43, 45, 47]

还请注意,可以使用task_limit参数调整可以同时运行的some_async_task的数量。你知道吗

免责声明:我是项目维护者。

您可以批量处理任务,其大小等于您仍然需要的结果数。如果您将这样的批处理交给asyncio.gather(),它将同时运行它们并保持结果的顺序。例如:

async def process(tasks, num):
    results = []
    task_iter = iter(tasks)
    while len(results) < num:
        next_batch = tuple(itertools.islice(task_iter, num - len(results)))
        if len(next_batch) == 0:
            break
        batch_results = await asyncio.gather(*next_batch)
        results.extend(r for r in batch_results if r == 'foo')

相关问题 更多 >