使用TQM跟踪芹菜和弦任务的进度?(Python)

2024-05-12 17:49:18 发布

您现在位置:Python中文网/ 问答频道 /正文

有没有办法跟踪和弦的进度,最好是在TQM条中

例如,如果我们采用the documentation exemple,我们将创建以下文件:

#proj/tasks.py

@app.task
def add(x, y):
    return x + y

@app.task
def tsum(numbers):
    return sum(numbers)

然后运行以下脚本:

from celery import chord
from proj.tasks import add, tsum

chord(add.s(i, i)
      for i in range(100))(tsum.s()).get()

我们怎样才能追踪和弦的进展

  • 我们不能使用update_state,因为chord()对象不是函数
  • 我们不能使用collect(),因为chord()(callback)会在结果准备好之前阻止脚本

理想情况下,我会设想类似this custom tqdm subclass for Dask的东西,但是我一直无法找到类似的解决方案

非常感谢任何帮助或暗示


Tags: fromimport脚本addappfortaskreturn
1条回答
网友
1楼 · 发布于 2024-05-12 17:49:18

所以我找到了解决办法

首先,chord()(回调)实际上并不会阻止脚本,只有.get()部分会阻止脚本。将所有任务发布到代理可能需要很长时间。幸运的是,有一种简单的方法可以通过信号跟踪发布过程。我们可以在发布开始之前创建进度条,并修改example handler from the documentation以更新它:

from tqdm import tqdm
from celery.signals import after_task_publish

publish_pbar = tqdm(total=100, desc="Publishing tasks")

@after_task_publish.connect(sender='tasks.add')
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
    publish_pbar.update(1)

c = chord(add.s(i, i)
      for i in range(100))(tsum.s())

# The script will resume once all tasks are published so close the pbar
publish_pbar.close()

但是,这只适用于发布任务,因为this signal是在发送任务的信号中执行的。task_success信号在worker进程中执行,因此这个技巧只能在worker日志中使用(据我所知)

因此,为了在发布所有任务并恢复脚本后跟踪进度,我从app.control.inspect().stats()转到worker stats。这将返回一个包含各种统计信息的dict,其中包括已完成的任务。以下是我的实现:

tasks_pbar = tqdm(total=100, desc="Executing tasks")

previous_total = 0
current_total = 0

while current_total<100:

    current_total = 0
    for key in app.control.inspect().stats():
        current_total += app.control.inspect().stats()[key]['total']['tasks.add']

    if current_total > previous_total:
        tasks_pbar.update(current_total-previous_total)

    previous_total = current_total

results = c.get()
tasks_pbar.close()

最后,我认为可能有必要将give names添加到任务中,用于信号处理程序的过滤和stats()dict,因此不要忘记将此添加到任务中:

#proj/tasks.py

@app.task(name='tasks.add')
def add(x, y):
    return x + y

如果有人能找到更好的解决方案,请分享

相关问题 更多 >