测试递归的Celery任务
我有一个函数,我们叫它 fob
。这个函数的任务就是检查一些事情,确保它们都正常,然后再调用另一个Celery任务进行进一步处理。
其中有一个变量是模糊的。如果这个变量不符合预期,fob
就会像这样重新安排自己:fob.delay(whatever)
,直到达到某个限制。
我想测试一下 fob
。但我不关心它重新执行后的情况。我只想确保 fob
能用相同的参数重新安排自己,并且自我控制计数器会增加。
我该怎么做呢?
1 个回答
0
写一个叫做 check_all
的函数,这个函数会检查所有的东西,如果一切正常,就返回 True
。
def check_all():
# returns true if all conditions are satisfied.
return True
现在写你的 fob
函数,这个函数也会检查所有的东西,并且会重新安排自己。
def fob(*args, **kwargs):
if check_all():
#go to next task if everything is ok
call_next_task.delay()
else:
# call itself again
fob.delay(*args, **kwargs)