获取嵌套链和和弦中的所有任务ID

31 投票
2 回答
2724 浏览
提问于 2025-04-18 02:12

我正在使用Celery 3.1.9和Redis作为后端。我正在运行的任务由几个子任务组成,这些子任务以“和弦”和“链”的形式运行。它的结构大致是这样的:

  1. 准备
  2. 下载数据(两个工作者的和弦)
  3. 解析并存储下载的数据
  4. 一个长时间运行的和弦(四个工作者)
  5. 完成
  6. 生成报告

列表中的每一项都是一个子任务,它们都是串联在一起的。步骤2和4是和弦。整个流程是通过为步骤4创建一个和弦来连接的,这个和弦的回调是步骤4到步骤6的链,然后为步骤2创建一个和弦,它的回调是步骤3到第一个和弦。最后,创建一个链来连接步骤1到第二个和弦。这个链通过delay()启动,并将其ID存储在数据库中。

问题有两个方面。首先,我希望能够撤销整个任务,其次,我想在我的Task类中有一个自定义的on_failure方法,用于清理工作,并向用户报告失败情况。

目前我存储了链的任务ID。我本以为可以用这个ID来撤销链。此外,如果出现错误,我想在on_failure处理程序中遍历链到根节点,以从数据库中检索相关记录。但这行不通,因为当你仅用任务的ID重新创建AsyncResult的实例时,它的父属性是None

我尝试的第二种方法是存储在外部链结果上调用serializable()的结果。然而,这并没有返回整个AsyncResult对象的树,只返回链中第一层的ID(也就是说,不包括和弦中的子任务ID)。

我尝试的第三种方法是自己实现serializable(),但事实证明,原始方法无法深入到超过两层的原因是链的子任务是celery.canvas.chord对象,而不是AsyncResult实例。

问题的一个示例:

chord([
    foo.si(),
    foo.si(),
    foo.si(),
], bar.si() | bar.si())
res = chord.apply_async()
pprint(res.serializable())

打印出以下内容:

(('50c9eb94-7a63-49dc-b491-6fce5fed3713',
  ('d95a82b7-c107-4a2c-81eb-296dc3fb88c3',
   [(('7c72310b-afc7-4010-9de4-e64cd9d30281', None), None),
    (('2cb80041-ff29-45fe-b40c-2781b17e59dd', None), None),
    (('e85ab83d-dd44-44b5-b79a-2bbf83c4332f', None), None)])),
 None)

第一个ID是回调链的ID,第二个ID来自和弦任务本身,最后三个是和弦内部的实际任务。但我无法获取回调链内部任务的结果(即两个bar.si()调用的ID)。

有没有办法获取实际的任务ID?

2 个回答

0

我有一个嵌套的工作流,这个工作流里有一些任务是分组的,有一些是串联的。下面这个递归的方法可以很好地获取任务的ID和它们的结果:

import celery



def get_task_id_result_tuple_list(run_dag, with_result=True):
    
    task_id_result_list = []
    
    # for groups, parents are first task, then iterate over the children
    if isinstance(run_dag, celery.result.GroupResult):
        entry = (run_dag.parent, run_dag.parent.result) if with_result else run_dag.parent
        task_id_result_list.append(entry)
        children = run_dag.children
        for child in children:
            task_id_result_list.extend(get_task_id_result_tuple_list(child, with_result))

    # for AsyncResults, append parents in reverse
    elif isinstance(run_dag, celery.result.AsyncResult):
        ch = run_dag
        ch_list = [(ch, ch.result)] if with_result else [ch]
        while ch.parent is not None:
            ch = ch.parent
            entry = (ch, ch.result) if with_result else ch
            ch_list.append(entry)
        
        # remember to reverse the list to get the calling order
        task_id_result_list.extend(reversed(ch_list))
        
    return task_id_result_list
    
# dag is the nested celery structure of chains and groups
run_dag = dag.apply_async()

task_id_result_tuples = get_task_id_result_tuple_list(run_dag)
task_id_only = get_task_id_result_tuple_list(run_dag, False)

注意:我还没有用这个方法测试过和和弦(chords)一起使用,但我想它可能可以直接用,或者可能需要加一个条件分支来处理这种情况。

1

一种比较“hacky”的方法是使用apply_async来调用任务,保存任务的ID,然后手动等待这些任务完成。这样你就能完全掌控发生了什么,但你应该只在最后的情况下才等待异步任务。现在你可以访问任务的ID、返回值等等。比如像这样:

 task1 = a_task.apply_async()
 task2 = b_task.apply_async()
 task3 = c_task.apply_async()

 tasks = [task1, task2, task3]

 for task in tasks:
     task.wait()

撰写回答