Celery 链式执行子任务时,是否在组任务完成前执行

2 投票
1 回答
1303 浏览
提问于 2025-04-19 17:28

我想做的是执行一组任务,然后把每个任务的结果发送到另一个任务,最后把所有这些任务的结果发送到一个最终的任务。

比如:

jobgroup = (
           group(tasks.task1.s(p) for p in params) |
           tasks.dmap.s(tasks.task2.s())
          )

这部分对我来说是可以正常工作的,但我遇到的问题是,如果我想把所有结果放到一个最终任务中,比如:

mychain = chain(jobgroup | tasks.task3.s())

我看到的是,task3() 在 task2 的任务还在等待状态时就被调用了,我是通过打印状态来查看这些任务的(以下代码是我在 task3 函数中打印的内容):

@task()
def task3(input):
   for item in input.results:
      logger.info(item.status)

记录结果

[2014-09-04 10:48:41,905: INFO/Worker-7] tasks.task3[27053688-3c5c-4ca5-975f-356a66d55364]: PENDING
[2014-09-04 10:48:41,905: INFO/Worker-7] tasks.task3[27053688-3c5c-4ca5-975f-356a66d55364]: PENDING

那么我该怎么设置,才能确保 task3 在 jobgroup 中的所有任务完成后再被调用呢?

1 个回答

1

Chord:

一个Chord由两个部分组成:头部和主体。头部是一些任务,这些任务必须在回调函数被调用之前完成。简单来说,Chord就是一组任务的回调。

例子:

>>> from celery import chord
>>> res = chord([add.s(2, 2), add.s(4, 4)])(sum_task.s())

你可以使用 chord 来完成这个任务。

from celery import chord, group, chain

task_1 = group(tasks.task1.s(p) for p in params)
task_2 = group(tasks.dmap.s(tasks.task2.s())
task_3 = tasks.task3.s()

workflow = chord(chain([task_1, task_2]))(task_3)

撰写回答