使用create_任务创建的任务从未等待,似乎打破了取消子任务的期望

2024-05-01 21:49:00 发布

您现在位置:Python中文网/ 问答频道 /正文

假设我们正在编写一个应用程序,该应用程序允许用户连续运行一个应用程序(假设这是针对API的一系列重要操作),并且可以并发运行多个应用程序。要求包括:

  • 用户可以控制并发应用程序的数量(这可能会限制API的并发负载,这通常很重要)
  • 如果操作系统试图关闭运行这个东西的Python程序,它应该正常终止,允许任何正在运行的应用程序在关闭之前完成运行

这里的问题是关于我们编写的任务管理器的,所以让我们去掉一些说明此问题的代码:

import asyncio
import signal


async def work_chunk():
    """Simulates a chunk of work that can possibly fail"""
    await asyncio.sleep(1)


async def protected_work():
    """All steps of this function MUST complete, the caller should shield it from cancelation."""
    print("protected_work start")
    for i in range(3):
        await work_chunk()
        print(f"protected_work working... {i+1} out of 3 steps complete")
    print("protected_work done... ")


async def subtask():
    print("subtask: starting loop of protected work...")
    cancelled = False
    while not cancelled:
        protected_coro = asyncio.create_task(protected_work())
        try:
            await asyncio.shield(protected_coro)
        except asyncio.CancelledError:
            cancelled = True
            await protected_coro
    print("subtask: cancelation complete")


async def subtask_manager():
    """
    Manage a pool of subtask workers. 
    (In the real world, the user can dynamically change the concurrency, but here we'll 
    hard code it at 3.)
    """
    tasks = {}
    while True:
        for i in range(3):
            task = tasks.get(i)
            if not task or task.done():
                tasks[i] = asyncio.create_task(subtask())
        await asyncio.sleep(5)


def shutdown(signal, main_task):
    """Cleanup tasks tied to the service's shutdown."""
    print(f"Received exit signal {signal.name}. Scheduling cancelation:")
    main_task.cancel()


async def main():
    print("main... start")
    coro = asyncio.ensure_future(subtask_manager())
    loop = asyncio.get_running_loop()
    loop.add_signal_handler(signal.SIGINT, lambda: shutdown(signal.SIGINT, coro))
    loop.add_signal_handler(signal.SIGTERM, lambda: shutdown(signal.SIGTERM, coro))
    await coro
    print("main... done")


def run():
    asyncio.run(main())


run()

subtask_manager管理一个工作线程池,定期查找当前的并发需求是什么,并适当地更新活动工作线程的数量(请注意,上面的代码删去了其中的大部分,只需硬编码一个数字,因为这对问题并不重要)

subtask是工作循环本身,它持续运行protected_work(),直到有人取消它

但是这个密码被破解了。当你给它一个信号,整个事情立刻崩溃

Screenshot of log output from invoking run(), illustrating that CancelledError is raised in the "except" block of the subtask function.

在我进一步解释之前,让我向您指出一段关键代码:

1   protected_coro = asyncio.create_task(protected_work())
2   try:
3       await asyncio.shield(protected_coro)
4   except asyncio.CancelledError:
5       cancelled = True
6       await protected_coro  # <-- This will raise CancelledError too!

经过一些调试后,我们发现try/except块不起作用。我们发现第3行和第6行都会引起取消错误

当我们进一步深入研究时,我们发现所有“等待”调用都会在子任务管理器被取消后抛出CanceledError,而不仅仅是上面提到的行。(即,第二行工作组(),await asyncio.sleep(1),第四行受保护工作组(),await work_chunk()引发取消错误。)

这是怎么回事

由于某种原因,Python似乎并没有像您所期望的那样传播取消,只是举手说“我现在正在取消一切”

为什么

显然,我不理解Python中的取消传播是如何工作的。我一直在努力寻找关于它如何工作的文档。有人能向我描述一下取消是如何以一种清晰的方式传播的,从而解释上述示例中的行为吗


Tags: oftheasyncio应用程序taskasyncsignalmain
2条回答

在研究了这个问题很长一段时间后,并对其他代码片段进行了实验(取消传播如预期的那样工作),我开始怀疑问题是否在于Python在这种情况下不知道这里的传播顺序

但是为什么呢

嗯,subtask_manager创建任务,但不等待它们

难道Python不认为创建该任务(与create_task)的协同程序拥有该任务吗?我认为Python使用await关键字专门来知道以什么顺序传播取消,如果在遍历整个任务树之后,它发现仍然没有取消的任务,它只会将它们全部销毁

因此,在我们知道没有等待异步任务的任何地方,我们都可以自己管理任务取消传播。因此,我们需要重构subtask_manager以捕获其自身的取消,并显式取消,然后等待其所有子任务:

async def subtask_manager():
    """
    Manage a pool of subtask workers. 
    (In the real world, the user can dynamically change the concurrency, but here we'll 
    hard code it at 3.)
    """
    tasks = {}
    while True:
        for i in range(3):
            task = tasks.get(i)
            if not task or task.done():
                tasks[i] = asyncio.create_task(subtask())
        try:
            await asyncio.sleep(5)
        except asyncio.CancelledError:
            print("cancelation detected, canceling children")
            [t.cancel() for t in tasks.values()]
            await asyncio.gather(*[t for t in tasks.values()])
            return

现在,我们的代码按预期工作:

Working log output.

注意:我已经回答了我自己的问题;一种风格,但我仍然对我关于取消传播工作原理的文本回答感到不满意。如果有人对取消传播的工作原理有更好的解释,我很乐意阅读

What's going on here? It would seem that Python, for some reason, isn't propagating cancelation as you would expect, and just throws up its hands and says "I'm canceling everything now".

TL;DR取消一切正是正在发生的事情,因为事件循环正在退出

为了研究这一点,我将add_signal_handler()的调用更改为loop.call_later(.5, lambda: shutdown(signal.SIGINT, coro))。Python的Ctrl+C处理有odd corners,我想检查这种奇怪的行为是否是这种情况的结果。但是这个错误在没有信号的情况下是完全可以复制的,所以不是这样

然而,异步IO取消确实不应该像代码显示的那样工作。取消一个任务会传播到它等待的未来(或另一个任务),但是shield是专门实现的,以避免这种情况。它创建并返回一个新的未来,并以cancel()不知道如何遵循的方式将原始(屏蔽)未来的结果连接到新的未来

我花了一些时间才发现真正发生的事情,那就是:

  • main末尾的await coro等待被取消的任务,因此只要shutdown取消它,它就会得到一个CancelledError

  • 异常导致main退出,并在asyncio.run()的末尾进入清理序列。此清理序列取消所有任务,包括您屏蔽的任务

您可以通过将main()末尾的await coro更改为:

try:
    await coro
finally:
    print('main... done')

你会看到,在你目睹的所有神秘的取消之前,“主要…完成”已经打印出来

为了解开谜团并解决问题,您应该推迟退出main,直到一切都完成。例如,您可以在main中创建tasksdict,将其传递给subtask_manager(),然后在取消主任务时等待这些关键任务:

async def subtask_manager(tasks):
    while True:
        for i in range(3):
            task = tasks.get(i)
            if not task or task.done():
                tasks[i] = asyncio.create_task(subtask())

        try:
            await asyncio.sleep(5)
        except asyncio.CancelledError:
            for t in tasks.values():
                t.cancel()
            raise

# ... shutdown unchanged

async def main():
    print("main... start")
    tasks = {}
    main_task = asyncio.ensure_future(subtask_manager(tasks))
    loop = asyncio.get_running_loop()
    loop.add_signal_handler(signal.SIGINT, lambda: shutdown(signal.SIGINT, main_task))
    loop.add_signal_handler(signal.SIGTERM, lambda: shutdown(signal.SIGTERM, main_task))
    try:
        await main_task
    except asyncio.CancelledError:
        await asyncio.gather(*tasks.values())
    finally:
        print("main... done")

请注意,主任务必须显式取消其子任务,因为这实际上不会自动发生。取消通过await链传播,并且subtask_manager并不显式地等待其子任务,它只是生成它们并等待其他任务,有效地屏蔽它们

相关问题 更多 >