获取嵌套链和和弦中的所有任务ID
我正在使用Celery 3.1.9和Redis作为后端。我正在运行的任务由几个子任务组成,这些子任务以“和弦”和“链”的形式运行。它的结构大致是这样的:
- 准备
- 下载数据(两个工作者的和弦)
- 解析并存储下载的数据
- 一个长时间运行的和弦(四个工作者)
- 完成
- 生成报告
列表中的每一项都是一个子任务,它们都是串联在一起的。步骤2和4是和弦。整个流程是通过为步骤4创建一个和弦来连接的,这个和弦的回调是步骤4到步骤6的链,然后为步骤2创建一个和弦,它的回调是步骤3到第一个和弦。最后,创建一个链来连接步骤1到第二个和弦。这个链通过delay()启动,并将其ID存储在数据库中。
问题有两个方面。首先,我希望能够撤销整个任务,其次,我想在我的Task
类中有一个自定义的on_failure方法,用于清理工作,并向用户报告失败情况。
目前我存储了链的任务ID。我本以为可以用这个ID来撤销链。此外,如果出现错误,我想在on_failure
处理程序中遍历链到根节点,以从数据库中检索相关记录。但这行不通,因为当你仅用任务的ID重新创建AsyncResult
的实例时,它的父属性是None
。
我尝试的第二种方法是存储在外部链结果上调用serializable()
的结果。然而,这并没有返回整个AsyncResult
对象的树,只返回链中第一层的ID(也就是说,不包括和弦中的子任务ID)。
我尝试的第三种方法是自己实现serializable()
,但事实证明,原始方法无法深入到超过两层的原因是链的子任务是celery.canvas.chord
对象,而不是AsyncResult
实例。
问题的一个示例:
chord([
foo.si(),
foo.si(),
foo.si(),
], bar.si() | bar.si())
res = chord.apply_async()
pprint(res.serializable())
打印出以下内容:
(('50c9eb94-7a63-49dc-b491-6fce5fed3713',
('d95a82b7-c107-4a2c-81eb-296dc3fb88c3',
[(('7c72310b-afc7-4010-9de4-e64cd9d30281', None), None),
(('2cb80041-ff29-45fe-b40c-2781b17e59dd', None), None),
(('e85ab83d-dd44-44b5-b79a-2bbf83c4332f', None), None)])),
None)
第一个ID是回调链的ID,第二个ID来自和弦任务本身,最后三个是和弦内部的实际任务。但我无法获取回调链内部任务的结果(即两个bar.si()调用的ID)。
有没有办法获取实际的任务ID?
2 个回答
我有一个嵌套的工作流,这个工作流里有一些任务是分组的,有一些是串联的。下面这个递归的方法可以很好地获取任务的ID和它们的结果:
import celery
def get_task_id_result_tuple_list(run_dag, with_result=True):
task_id_result_list = []
# for groups, parents are first task, then iterate over the children
if isinstance(run_dag, celery.result.GroupResult):
entry = (run_dag.parent, run_dag.parent.result) if with_result else run_dag.parent
task_id_result_list.append(entry)
children = run_dag.children
for child in children:
task_id_result_list.extend(get_task_id_result_tuple_list(child, with_result))
# for AsyncResults, append parents in reverse
elif isinstance(run_dag, celery.result.AsyncResult):
ch = run_dag
ch_list = [(ch, ch.result)] if with_result else [ch]
while ch.parent is not None:
ch = ch.parent
entry = (ch, ch.result) if with_result else ch
ch_list.append(entry)
# remember to reverse the list to get the calling order
task_id_result_list.extend(reversed(ch_list))
return task_id_result_list
# dag is the nested celery structure of chains and groups
run_dag = dag.apply_async()
task_id_result_tuples = get_task_id_result_tuple_list(run_dag)
task_id_only = get_task_id_result_tuple_list(run_dag, False)
注意:我还没有用这个方法测试过和和弦(chords)一起使用,但我想它可能可以直接用,或者可能需要加一个条件分支来处理这种情况。
一种比较“hacky”的方法是使用apply_async来调用任务,保存任务的ID,然后手动等待这些任务完成。这样你就能完全掌控发生了什么,但你应该只在最后的情况下才等待异步任务。现在你可以访问任务的ID、返回值等等。比如像这样:
task1 = a_task.apply_async()
task2 = b_task.apply_async()
task3 = c_task.apply_async()
tasks = [task1, task2, task3]
for task in tasks:
task.wait()