假设我们正在编写一个应用程序,该应用程序允许用户连续运行一个应用程序(假设这是针对API的一系列重要操作),并且可以并发运行多个应用程序。要求包括:
这里的问题是关于我们编写的任务管理器的,所以让我们去掉一些说明此问题的代码:
import asyncio
import signal
async def work_chunk():
"""Simulates a chunk of work that can possibly fail"""
await asyncio.sleep(1)
async def protected_work():
"""All steps of this function MUST complete, the caller should shield it from cancelation."""
print("protected_work start")
for i in range(3):
await work_chunk()
print(f"protected_work working... {i+1} out of 3 steps complete")
print("protected_work done... ")
async def subtask():
print("subtask: starting loop of protected work...")
cancelled = False
while not cancelled:
protected_coro = asyncio.create_task(protected_work())
try:
await asyncio.shield(protected_coro)
except asyncio.CancelledError:
cancelled = True
await protected_coro
print("subtask: cancelation complete")
async def subtask_manager():
"""
Manage a pool of subtask workers.
(In the real world, the user can dynamically change the concurrency, but here we'll
hard code it at 3.)
"""
tasks = {}
while True:
for i in range(3):
task = tasks.get(i)
if not task or task.done():
tasks[i] = asyncio.create_task(subtask())
await asyncio.sleep(5)
def shutdown(signal, main_task):
"""Cleanup tasks tied to the service's shutdown."""
print(f"Received exit signal {signal.name}. Scheduling cancelation:")
main_task.cancel()
async def main():
print("main... start")
coro = asyncio.ensure_future(subtask_manager())
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, lambda: shutdown(signal.SIGINT, coro))
loop.add_signal_handler(signal.SIGTERM, lambda: shutdown(signal.SIGTERM, coro))
await coro
print("main... done")
def run():
asyncio.run(main())
run()
subtask_manager
管理一个工作线程池,定期查找当前的并发需求是什么,并适当地更新活动工作线程的数量(请注意,上面的代码删去了其中的大部分,只需硬编码一个数字,因为这对问题并不重要)
subtask
是工作循环本身,它持续运行protected_work()
,直到有人取消它
但是这个密码被破解了。当你给它一个信号,整个事情立刻崩溃
在我进一步解释之前,让我向您指出一段关键代码:
1 protected_coro = asyncio.create_task(protected_work())
2 try:
3 await asyncio.shield(protected_coro)
4 except asyncio.CancelledError:
5 cancelled = True
6 await protected_coro # <-- This will raise CancelledError too!
经过一些调试后,我们发现try/except块不起作用。我们发现第3行和第6行都会引起取消错误
当我们进一步深入研究时,我们发现所有“等待”调用都会在子任务管理器被取消后抛出CanceledError,而不仅仅是上面提到的行。(即,第二行工作组(),await asyncio.sleep(1)
,第四行受保护工作组(),await work_chunk()
,也引发取消错误。)
这是怎么回事
由于某种原因,Python似乎并没有像您所期望的那样传播取消,只是举手说“我现在正在取消一切”
为什么
显然,我不理解Python中的取消传播是如何工作的。我一直在努力寻找关于它如何工作的文档。有人能向我描述一下取消是如何以一种清晰的方式传播的,从而解释上述示例中的行为吗
在研究了这个问题很长一段时间后,并对其他代码片段进行了实验(取消传播如预期的那样工作),我开始怀疑问题是否在于Python在这种情况下不知道这里的传播顺序
但是为什么呢
嗯,
subtask_manager
创建任务,但不等待它们难道Python不认为创建该任务(与
create_task
)的协同程序拥有该任务吗?我认为Python使用await
关键字专门来知道以什么顺序传播取消,如果在遍历整个任务树之后,它发现仍然没有取消的任务,它只会将它们全部销毁因此,在我们知道没有等待异步任务的任何地方,我们都可以自己管理任务取消传播。因此,我们需要重构
subtask_manager
以捕获其自身的取消,并显式取消,然后等待其所有子任务:现在,我们的代码按预期工作:
注意:我已经回答了我自己的问题;一种风格,但我仍然对我关于取消传播工作原理的文本回答感到不满意。如果有人对取消传播的工作原理有更好的解释,我很乐意阅读
TL;DR取消一切正是正在发生的事情,因为事件循环正在退出
为了研究这一点,我将
add_signal_handler()
的调用更改为loop.call_later(.5, lambda: shutdown(signal.SIGINT, coro))
。Python的Ctrl+C处理有odd corners,我想检查这种奇怪的行为是否是这种情况的结果。但是这个错误在没有信号的情况下是完全可以复制的,所以不是这样然而,异步IO取消确实不应该像代码显示的那样工作。取消一个任务会传播到它等待的未来(或另一个任务),但是
shield
是专门实现的,以避免这种情况。它创建并返回一个新的未来,并以cancel()
不知道如何遵循的方式将原始(屏蔽)未来的结果连接到新的未来我花了一些时间才发现真正发生的事情,那就是:
main末尾的
await coro
等待被取消的任务,因此只要shutdown
取消它,它就会得到一个CancelledError
异常导致
main
退出,并在asyncio.run()
的末尾进入清理序列。此清理序列取消所有任务,包括您屏蔽的任务您可以通过将
main()
末尾的await coro
更改为:你会看到,在你目睹的所有神秘的取消之前,“主要…完成”已经打印出来
为了解开谜团并解决问题,您应该推迟退出
main
,直到一切都完成。例如,您可以在main
中创建tasks
dict,将其传递给subtask_manager()
,然后在取消主任务时等待这些关键任务:请注意,主任务必须显式取消其子任务,因为这实际上不会自动发生。取消通过
await
链传播,并且subtask_manager
并不显式地等待其子任务,它只是生成它们并等待其他任务,有效地屏蔽它们相关问题 更多 >
编程相关推荐