celery - 链接组和子任务 -> 执行顺序错误

31 投票
2 回答
22355 浏览
提问于 2025-04-17 17:22

当我有如下的情况

group1 = group(task1.si(), task1.si(), task1.si())
group2 = group(task2.si(), task2.si(), task2.si())

workflow = chain(group1, group2, task3.si())

直观上理解,task3 应该在组2里的所有任务完成后才执行。

但实际上,task3 在组1的任务开始但还没完成的时候就执行了。

我哪里做错了呢?

2 个回答

23

我也遇到了和celery一样的问题,想要实现一个工作流程,第一步是“生成一百万个任务”。我尝试了分组、子任务,结果我的第二步在第一步还没完成的时候就开始了。

长话短说,我可能找到了一个解决办法,使用了“和弦”(chords)和一个简单的结束器:

@celery.task
def chordfinisher( *args, **kwargs ):
  return "OK"

这个结束器没做什么特别的事情,但它让我能够做到这一点:

tasks = []
for id in ids:
    tasks.append( mytask.si( id ) )
step1 = chord( group( tasks ), chordfinisher.si() )

step2 = ...

workflow = chain( step1, step2 )

最开始我想把第一步放在一个子任务里,但由于我猜测的原因,调用一个组的操作结束后,任务就被认为完成了,然后我的工作流程就继续往下走了……

如果有人有更好的办法,我很感兴趣!

27

所以,实际上在celery中,你不能把两个组连接在一起。
我猜这是因为连接的组和任务会自动变成一个和弦(chord)。
--> Celery文档: http://docs.celeryproject.org/en/latest/userguide/canvas.html

把一个组和另一个任务连接在一起会自动升级成一个和弦:

组会返回一个父任务。当把两个组连接在一起时,我猜当第一个组完成后,和弦会启动回调“任务”。我怀疑这个“任务”实际上是第二个组的“父任务”。我进一步猜测,这个父任务在启动完组内所有子任务后就会完成,因此第二组之后的下一个项目会被执行。

为了演示这一点,这里有一些示例代码。你需要先有一个正在运行的celery实例。

# celery_experiment.py

from celery import task, group, chain, chord
from celery.signals import task_sent, task_postrun, task_prerun

import time
import logging

import random
random.seed()

logging.basicConfig(level=logging.DEBUG)

### HANDLERS ###    
@task_prerun.connect()
def task_starting_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):    
    try:
        logging.info('[%s] starting' % kwargs['id'])
    except KeyError:
        pass

@task_postrun.connect()
def task_finished_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
    try:    
        logging.info('[%s] finished' % kwargs['id'])
    except KeyError:
        pass


def random_sleep(id):
    slp = random.randint(1, 3)
    logging.info('[%s] sleep for %ssecs' % (id, slp))
    time.sleep(slp)

@task()
def thing(id):
    logging.info('[%s] begin' % id)
    random_sleep(id)
    logging.info('[%s] end' % id)


def exec_exp():
    st = thing.si(id='st')
    st_arr = [thing.si(id='st_arr1_a'), thing.si(id='st_arr1_b'), thing.si(id='st_arr1_c'),]
    st_arr2 = [thing.si(id='st_arr2_a'), thing.si(id='st_arr2_b'),]
    st2 = thing.si(id='st2')
    st3 = thing.si(id='st3')
    st4 = thing.si(id='st4')

    grp1 = group(st_arr)
    grp2 = group(st_arr2)

    # chn can chain two groups together because they are seperated by a single subtask
    chn = (st | grp1 | st2 | grp2 | st3 | st4)

    # in chn2 you can't chain two groups together. what will happen is st3 will start before grp2 finishes
    #chn2 = (st | st2 | grp1 | grp2 | st3 |  st4)

    r = chn()
    #r2 = chn2()

撰写回答