测试递归的Celery任务

1 投票
1 回答
961 浏览
提问于 2025-04-18 14:35

我有一个函数,我们叫它 fob。这个函数的任务就是检查一些事情,确保它们都正常,然后再调用另一个Celery任务进行进一步处理。

其中有一个变量是模糊的。如果这个变量不符合预期,fob 就会像这样重新安排自己:fob.delay(whatever),直到达到某个限制。

我想测试一下 fob。但我不关心它重新执行后的情况。我只想确保 fob 能用相同的参数重新安排自己,并且自我控制计数器会增加。

我该怎么做呢?

1 个回答

0

写一个叫做 check_all 的函数,这个函数会检查所有的东西,如果一切正常,就返回 True

def check_all():
    # returns true if all conditions are satisfied.
    return True

现在写你的 fob 函数,这个函数也会检查所有的东西,并且会重新安排自己。

def fob(*args, **kwargs):
    if check_all():
        #go to next task if everything is ok
        call_next_task.delay()
    else:
        # call itself again
        fob.delay(*args, **kwargs)

撰写回答