如何跟踪celery.group任务的进度?

11 投票
3 回答
15348 浏览
提问于 2025-04-17 17:25
@celery.task
def my_task(my_object):
    do_something_to_my_object(my_object)


#in the code somewhere 
tasks = celery.group([my_task.s(obj) for obj in MyModel.objects.all()])
group_task = tasks.apply_async()

问题:celery有没有什么方法可以检测一个组任务的进度?我能知道一共有多少个任务,以及有多少个任务已经处理完了吗?

3 个回答

1

在阅读关于 AsyncResult 的文档时,发现有一个 collect 方法,它可以在结果到达时进行收集。

http://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.AsyncResult.collect

from celery import group
from proj.celery import app

@app.task(trail=True)
def A(how_many):
    return group(B.s(i) for i in range(how_many))()

@app.task(trail=True)
def B(i):
    return pow2.delay(i)

@app.task(trail=True)
def pow2(i):
    return i ** 2

示例输出:

>>> from celery.result import ResultBase
>>> from proj.tasks import A

>>> result = A.delay(10)
>>> [v for v in result.collect()
...  if not isinstance(v, (ResultBase, tuple))]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

注意: 要使 result.children 中存储子任务的列表,必须启用 Task.trail 选项。这个选项默认是开启的,但为了说明这里特别提到。

补充说明:

经过进一步测试,我发现虽然 collect 表示它会收集结果,但它仍然会等待。我发现要获取进度,你需要获取子任务的结果,像这样:

group_result = mygrouptask.delay().get()
for result in tqdm(group_result.children, total=count):
    yield result.get()

tqdm 可以在控制台中显示进度

mygrouptask 返回一个 celery 组,像这样:

return group(mytask.s(arg) for arg in args)()
5

这里有一个完整的示例,基于@dalore的回答。

首先是 tasks.py 文件。

import time
from celery import Celery, group

app = Celery('tasks', broker='pyamqp://guest@127.0.0.1//', backend='redis://localhost')

@app.task(trail=True)
def add(x, y):
    time.sleep(1)
    return x + y

@app.task(trail=True)
def group_add(l1, l2):
    return group(add.s(x1, x2) for x1, x2 in zip(l1, l2))()

使用Docker启动Redis服务器:docker run --name my-redis -p 6379:6379 -d redis

使用Docker启动RabbitMQ:docker run -d --hostname my-rabbit --name my-rabbit -p 5672:5672 rabbitmq:alpine

然后在一个新的命令行窗口中启动一个单独的Celery工作进程:celery -A tasks worker --loglevel=info -c 1

接下来运行下面的测试脚本。

from tasks import group_add
from tqdm import tqdm

total = 10

l1 = range(total)
l2 = range(total)
delayed_results = group_add.delay(l1, l2)
delayed_results.get()  # Wait for parent task to be ready.

results = []
for result in tqdm(delayed_results.children[0], total=total):
    results.append(result.get())
print(results)

你应该会看到类似下面的内容,进度条每秒增加10%。

50%|#####     | 5/10 [00:05<00:05,  1.01s/it
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

最后,清理你的Redis和RabbitMQ容器。

docker stop my-rabbit my-redis
docker rm my-rabbit my-redis
6

在使用命令行(ipython的自动补全功能)时,我发现了一个叫做 group_task 的东西(它是一个 celery.result.ResultSet 对象),里面有一个方法叫 completed_count,这个方法正好提供了我需要的信息。

我还找到了相关的文档,地址是 http://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.ResultSet.completed_count

撰写回答