如何追踪由celery中的和弦标题组成的组内每个任务的进度?

2024-03-28 17:55:29 发布

您现在位置:Python中文网/ 问答频道 /正文

import celery
def temptask(n):
    header=list(tempsubtask.si(i) for i in range(n))
    callback=templink.si('printed at last?')
    r = celery.chord(celery.group(header))(callback)
    return r

@task()
def tempsubtask(i):
    print i    
    for x in range(i):
        time.sleep(2)
        current_task.update_state(
            state='PROGRESS', meta={'completed': x, 'total': i })

@task()
def templink(x):
    print 'this should be run at last %s'%x

#executing temptask
r = temptask(100)

我想访问由tempsubtask更新的进度状态。我怎样才能实现它呢?在


Tags: infortaskdefcallbackrangeatcelery
2条回答

经过几个小时的谷歌搜索,我偶然发现了http://www.manasupo.com/2012/03/chord-progress-in-celery.html。虽然这个解决方案对我来说不是开箱即用的,但它确实激励我尝试类似的方法。在

from celery.utils import uuid
from celery import chord

class ProgressChord(chord):

    def __call__(self, body=None, **kwargs):
        _chord = self.type
        body = (body or self.kwargs['body']).clone()
        kwargs = dict(self.kwargs, body=body, **kwargs)
        if _chord.app.conf.CELERY_ALWAYS_EAGER:
            return self.apply((), kwargs)
        callback_id = body.options.setdefault('task_id', uuid())
        r= _chord(**kwargs)
        return _chord.AsyncResult(callback_id), r

而不是执行芹菜。和弦我使用ProgressChord如下:

^{pr2}$

r的返回值包含一个元组,该元组同时具有回调的asyncresult和组结果。所以成功是这样的:

In [3]: r
Out[3]: 
(<AsyncResult: bf87507c-14cb-4ac4-8070-d32e4ff326a6>,
 <GroupResult: af69e131-5a93-492d-b985-267484651d95 [4672cbbb-8ec3-4a9e-971a-275807124fae, a236e55f-b312-485c-a816-499d39d7de41, e825a072-b23c-43f2-b920-350413fd5c9e, e3f8378d-fd02-4a34-934b-39a5a735871d, c4f7093b-9f1a-4e5e-b90d-66f83b9c97c4, d5c7dc2c-4e10-4e71-ba2b-055a33e15f02, 07b1c6f7-fe95-4c1f-b0ba-6bc82bceaa4e, 00966cb8-41c2-4e95-b5e7-d8604c000927, e039c78e-6647-4c8d-b59b-e9baf73171a0, 6cfdef0a-25a2-4905-a40e-fea9c7940044]>)

我继承并重写了[celery.chord][1]而不是{},因为我在任何地方都找不到它的源代码。在

我也有类似的问题。网上的大多数例子都已经过时了,文档没有多大帮助,但是文档中有指向源代码的链接,阅读确实对我有帮助。 我的目标是分组组织并行任务。这些组必须按顺序执行。 所以我决定在分别开始任何任务之前生成任务id,并且只分配它们。我用的是芹菜4.3.0

下面是一个简单的例子。在

首先,我需要一个虚拟任务使执行顺序化,并能够检查某个组的状态。由于这是一个回调函数,它只会在组中的所有其他任务之后完成。在

@celery.task(bind=True, name="app.tasks.dummy_task")
def dummy_task( self, results=None, *args, **kwargs ):
    return results

我在这里的评论解释了如何分配id。在

^{pr2}$

这就是我可以检查我的应用程序中任何任务的状态的方法。在

# This is a simplified example
# some code is omitted
from celery.result import AsyncResult


def task_status( task_id=None ):

    # PENDING
    # RECEIVED
    # STARTED
    # SUCCESS
    # FAILURE
    # REVOKED
    # RETRY

    task = AsyncResult(task_id)

    response = {
      'state': task.state,
    }

    return jsonify(response), 200

相关问题 更多 >