Celery 链式执行子任务时,是否在组任务完成前执行
我想做的是执行一组任务,然后把每个任务的结果发送到另一个任务,最后把所有这些任务的结果发送到一个最终的任务。
比如:
jobgroup = (
group(tasks.task1.s(p) for p in params) |
tasks.dmap.s(tasks.task2.s())
)
这部分对我来说是可以正常工作的,但我遇到的问题是,如果我想把所有结果放到一个最终任务中,比如:
mychain = chain(jobgroup | tasks.task3.s())
我看到的是,task3() 在 task2 的任务还在等待状态时就被调用了,我是通过打印状态来查看这些任务的(以下代码是我在 task3 函数中打印的内容):
@task()
def task3(input):
for item in input.results:
logger.info(item.status)
记录结果
[2014-09-04 10:48:41,905: INFO/Worker-7] tasks.task3[27053688-3c5c-4ca5-975f-356a66d55364]: PENDING
[2014-09-04 10:48:41,905: INFO/Worker-7] tasks.task3[27053688-3c5c-4ca5-975f-356a66d55364]: PENDING
那么我该怎么设置,才能确保 task3 在 jobgroup 中的所有任务完成后再被调用呢?
1 个回答
1
一个Chord由两个部分组成:头部和主体。头部是一些任务,这些任务必须在回调函数被调用之前完成。简单来说,Chord就是一组任务的回调。
例子:
>>> from celery import chord
>>> res = chord([add.s(2, 2), add.s(4, 4)])(sum_task.s())
你可以使用 chord
来完成这个任务。
from celery import chord, group, chain
task_1 = group(tasks.task1.s(p) for p in params)
task_2 = group(tasks.dmap.s(tasks.task2.s())
task_3 = tasks.task3.s()
workflow = chord(chain([task_1, task_2]))(task_3)