asyncio.gather的顺序版本

1 投票
4 回答
2841 浏览
提问于 2025-06-08 05:50

我尝试创建一个方法,类似于 asyncio.gather,但这个方法会顺序执行任务,而不是异步执行:

async def in_sequence(*tasks):
    """Executes tasks in sequence"""
    for task in tasks:
        await task

接下来,这个方法应该像这样使用:

async def some_work(work_name):
    """Do some work"""
    print(f"Start {work_name}")
    await asyncio.sleep(1)
    if raise_exception:
        raise RuntimeError(f"{work_name} raise an exception")
    print(f"Finish {work_name}")

async def main():
    try:
        await asyncio.gather(
            some_work("work1"),         # work1, work2, in_sequence and work5 executed in concurrently
            some_work("work2"),
            in_sequence(
                some_work("work3"),     # work3 and work4 executed in sequence
                some_work("work4")
            ),
            some_work("work5"),


    except RuntimeError as error:
        print(error)                    # raise an exception at any point to terminate

一切都运行得很好,直到我在 some_work 中抛出了一个异常:

async def main():
    try:
        await asyncio.gather(
            some_work("work1"),
            some_work("work2"),
            in_sequence(
                some_work("work3", raise_exception=True),       # raise an exception here
                some_work("work4")
            ),
            some_work("work5"),


    except RuntimeError as error:
        print(error)

紧接着,我收到了以下错误信息:

RuntimeWarning: coroutine 'some_work' was never awaited

我查看了文档,并继续进行实验:

async def in_sequence(*tasks):
    """Executes tasks in sequence"""
    _tasks = []
    for task in tasks:
        _tasks.append(asyncio.create_task(task))

    for _task in _tasks:
        await _task

这个版本按预期工作了!

在这方面,我有以下几个问题:

  1. 为什么第二个版本能工作,而第一个不行?
  2. asyncio 是否已经有工具可以顺序执行任务列表?
  3. 我选择的实现方法对吗?还是有更好的选择?

相关问题:

  • 暂无相关问题
暂无标签

4 个回答

1

受到用户4815162342和Anton Pomieshchenko解决方案的启发,我想出了这个变体:

async def in_sequence(*storm):
    twister = iter(storm)
    for task in twister:
        task = task() # if it's a regular function, it's done here.
        if inspect.isawaitable(task):
            try:
                await task # if it's also awaitable, await it
            except BaseException as e:
                task.throw(e) # if an error occurs, throw it into the coroutine
            finally:
                task.close() # to ensure coroutine closer

    assert not any(twister) # optionally verify that the iterator is now empty

这样你就可以把普通函数和协程结合起来,使用这个 in_sequence。不过要确保这样调用它:

await in_sequence(*[b.despawn, b.release])

注意这里没有使用 ()(也就是 __call__()),因为如果加上了,普通函数会立刻被调用,而协程会因为没有被等待而抛出 RuntimeWarning 警告。(在我的例子中, b.despawn 是一个协程,而 b.release 不是)

你也可以在调用 task() 之前额外检查一下 callable(task),但这就看你自己了。

1

这个版本的效果如预期一样好!

第二个版本的问题在于,它并没有顺序执行协程,而是并行执行。这是因为 asyncio.create_task() 会让协程和当前的协程一起并行运行。所以当你在循环中等待任务时,实际上是让所有任务同时运行,而不是一个接一个地等。尽管看起来像是顺序执行,但整个循环的运行时间只会取决于最长的任务。(想了解更多,可以查看这里。)

你第一个版本中显示的警告是为了防止你不小心创建一个从未被等待的协程,比如只写 asyncio.sleep(1) 而不是 await asyncio.sleep(1)。在 asyncio 看来,main 是在创建协程对象,并把它们传给 in_sequence,而这个函数却“忘记”去等待其中的一些。

抑制警告信息的一种方法是让协程运行,但立即取消它。例如:

async def in_sequence(*coros):
    remaining = iter(coros)
    for coro in remaining:
        try:
            await coro
        except Exception:
            for c in remaining:
                asyncio.create_task(c).cancel()
            raise

注意,以下划线开头的变量名表示这个变量没有被使用,所以如果你真的用到它们,就不要这样命名。

1
  1. 第一个版本不管用,因为 in_sequence 没有处理在 await task 时可能出现的错误。第二个版本能正常工作,因为 create_task 创建了一个类似未来对象的 任务,这个任务会运行协程。这个对象不会返回或传播被包裹的协程的结果。当你 await 这个对象时,它会暂停,直到有 结果异常设置,或者直到 它被取消

  2. 看起来它没有。

  3. 第二个版本会同时执行传入的协程,所以这不是正确的实现。如果你真的想用某个 in_sequence 函数,你可以:
    • 以某种方式延迟协程的创建。
    • 在一个 async 函数中分组顺序执行。

例如:

async def in_sequence(*fn_and_args):
    for fn, args, kwargs in fn_and_args:
        await fn(*args, **kwargs)  # create a coro and await it in place

in_sequence(
    (some_work, ("work3",), {'raise_exception': True}),
    (some_work, ("work4",), {}),
)
async def in_sequence():
    await some_work("work3", raise_exception=True)
    await some_work("work4")
2

你提到的 in_sequence 版本在使用 asyncio.create_task 时是有效的,但我觉得并不是这样。从文档来看:

将协程(coro)封装成一个任务(Task),并安排它的执行。返回这个任务对象。

看起来这个方法是并行运行协程,但你其实需要的是按顺序执行。

所以我做了一些实验,发现了两种解决这个问题的方法。

你可以使用原来的 in_sequence 函数,然后加上这段代码,来隐藏那个错误:

import warnings
warnings.filterwarnings(
    'ignore',
    message=r'^coroutine .* was never awaited$',
    category=RuntimeWarning
)

或者可以这样修复 in_sequence 函数:

async def in_sequence(*tasks):
    for index, task in enumerate(tasks):
        try:
            await task
        except Exception as e:
            for task in tasks[index + 1:]:
                task.close()
            raise e

关于其他问题的回答:

  1. 那个警告是由 C++ 代码触发的,当你没有链接到协程时。下面的简单代码可以让你理解这个概念(在终端中运行):

async def test():
    return 1

f = test()
f = None # after that you will get that error
  1. 我不知道
  2. 见上文

撰写回答