如何获取Python/FastAPI中事件循环的所有待处理任务?

4 投票
1 回答
108 浏览
提问于 2025-04-14 16:19

我正在尝试理解我的FastAPI应用可能出现的速度变慢的问题。

我理解的是,每次我使用await的时候,都会把一个任务安排到后面去执行,这样就会形成一个待处理任务的堆积。如果某种原因导致处理速度变慢,那么待处理的任务数量就会增加。我该如何监控这些待处理任务的数量呢?

1 个回答

3

你可以使用 asyncio.all_tasks(loop=None) 来实现这个功能:

这个方法会返回一个集合,里面包含了所有还没有完成的 任务 对象,这些任务是由事件循环运行的。

如果 loop 是 None,那么会使用 get_running_loop() 来获取当前的事件循环。

正如在 这篇文章 中所描述的(我建议你去看看):

如何获取所有 Asyncio 任务

我们可能需要访问一个 asyncio 程序中的所有任务。

这可能有很多原因,比如:

  • 查看程序的当前状态或复杂性。
  • 记录所有正在运行的任务的详细信息。
  • 找到一个可以查询或取消的任务。

我们可以通过 asyncio.all_tasks() 函数获取所有已调度和正在运行(尚未完成)的任务集合。

例如:

# get all tasks 
tasks = asyncio.all_tasks() 

这将返回一个包含所有任务的集合。

因为是集合,所以每个任务只会出现一次。

如果任务会被包含在内,条件是:

  • 任务已经被调度但尚未运行。
  • 任务当前正在运行(例如,可能暂时挂起)

这个集合还会包括当前正在运行的任务,比如执行调用 asyncio.all_tasks() 函数的协程的任务。

另外,记住,asyncio.run() 方法用于启动 asyncio 程序时,会将提供的协程包装成一个任务。这意味着所有任务的集合也会包括程序的入口点任务

此外,正如在这篇 相关的文章 中解释的那样,我们可以监控并获取所有正在运行的任务的详细信息,以便检测任何卡住的长时间运行任务,具体方法如下:

...
# get all tasks
for task in asyncio.all_tasks():
    # log the task
    logging.debug(task)

这将返回类似于:

DEBUG:root:<Task pending name='Task-2' coro=<work() running at /...> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[TaskGroup._on_task_done()]>

此外,我们还可以报告每个任务的完整堆栈跟踪:

...
# get all tasks
for task in asyncio.all_tasks():
    # report the trace
    task.print_stack()

最后,你对 async/await(协程)在异步编程中的理解可能不完全正确,因此我建议你查看 这个回答以获取更多详细信息。

撰写回答