如何使用Celery有条件地执行任务?

1 投票
2 回答
1898 浏览
提问于 2025-04-18 10:53

我有一个像这样的celery任务队列:

from __future__ import absolute_import

from proj.celery import app


@app.task
def add(x, y):
    return x + y


@app.task
def mul(x, y):
    return x * y


@app.task
def xsum(numbers):
    return sum(numbers)

现在我想在只有前面两个任务 addmul 都成功完成的情况下,才调用第三个任务 xsum。我该如何使用celery来实现这个呢?

2 个回答

0

你需要保存结果,并在xsum函数中检查这些结果。

3

你可以使用一些像 chainchord 的工具,具体可以参考文档。简单来说,chain 是按顺序执行所有任务,而 chord 则由一个头部(也就是一组任务)和一个回调(最后的任务)组成,只有当这组任务完成后,回调才会执行。例如,你可以这样做:

callback = xsum.s()
header = [add.s(1, 1), mul.s(2, 2)]
result = chord(header)(callback)
result.get()

关于错误处理,我直接引用文档的内容:

从3.1版本开始,错误会传递到回调函数,这样回调就不会执行了,而是会变成失败状态,并且错误会被设置为 ChordError 异常:

如果你使用的是3.0.14或更高版本,可以通过设置 CELERY_CHORD_PROPAGATES 来启用这种新行为:

CELERY_CHORD_PROPAGATES = True

撰写回答