芹菜链在组任务完成之前执行子任务

2024-04-20 10:11:46 发布

您现在位置:Python中文网/ 问答频道 /正文

我要做的是执行一组任务,然后将组中每个任务的结果发送到另一个任务,然后将所有这些任务的结果发送到最后一个任务。在

例如

jobgroup = (
           group(tasks.task1.s(p) for p in params) |
           tasks.dmap.s(tasks.task2.s())
          )

这部分对我来说还可以,但问题是如果我想把所有的结果都集中到一个最终的任务中,例如

^{pr2}$

我看到的是,在task2中的任务仍处于挂起状态时调用task3(),方法是在传入时打印出状态(以下片段是我在task3函数中打印的内容:

@task()
def task3(input):
   for item in input.results:
      logger.info(item.status)

记录结果

[2014-09-04 10:48:41,905: INFO/Worker-7] tasks.task3[27053688-3c5c-4ca5-975f-356a66d55364]: PENDING
[2014-09-04 10:48:41,905: INFO/Worker-7] tasks.task3[27053688-3c5c-4ca5-975f-356a66d55364]: PENDING

那么,如何设置它,以便在jobgroup中的所有任务完成之前不调用task3?在


Tags: ininfoforinput状态groupparamsitem
1条回答
网友
1楼 · 发布于 2024-04-20 10:11:46

Chord:

A chord consists of a header and a body. The header is a group of tasks that MUST COMPLETE before the callback is called. A chord is essentially a callback for a group of tasks.

示例:

>>> from celery import chord
>>> res = chord([add.s(2, 2), add.s(4, 4)])(sum_task.s())

您可以使用chord来完成任务。在

^{pr2}$

相关问题 更多 >