Celery的扇出和后续同步
我正在用Python(2.x)和Celery做一个(至少我觉得是)简单的分发系统。基本的想法是对一组输入文档运行一些算法的不同变体,然后对输出再用另一个算法做同样的事情(这样重复几次)。简单来说就是这样:
doc1
/|\
doc1_1 doc1_2 doc1_3
/|\ /|\ /|\
doc1_1_2 ....
另外,在这个执行链的某个点能够同步一下会很有帮助。虽然这不是必须的,但这样可以减少整体的执行时间。
我尝试在一个任务中创建和执行多个子任务,但我找不到方法来判断这些子任务是否失败或者是否完成。构建大量的任务链看起来有点不优雅(我是C语言程序员,所以不敢自称权威),但至少我能获取到链中每个任务的当前状态。如果我理解那些比较模糊的文档没错的话,我应该可以通过一个包含所有链的“和弦”来实现同步,但似乎没有办法获取每条链的状态。
我对此感到很困惑,因为我对Celery的设计理念不太熟悉,而文档也很稀少,帮不上忙。那么,怎么做才是“正确的”(或者说最不丑陋的)方式呢?
1 个回答
0
我通过建立很多链,然后把它们放到一个组里执行,得到了我想要的结果。我的代码大致是这样的:
res = []
for sequence in product(*input, *action):
method = getattr(tasks, sequence[1]['method'])
ch = chain(method.s(input_document, **(sequence[1])))
for seq in sequence[2:]:
method = getattr(tasks, seq['method'])
ch |= method.s(**seq)
res.append(ch)
r = group(res).apply_async()
r.save()
return r.id
简单来说,就是计算所有输入文档和操作的点积(算法设置),从任务模块中获取相应的方法,把它们全部串联起来,放到一个组里,然后执行。
另外,从GroupResult中获取链的结果似乎可以正常工作,只要你别忘了保存和恢复GroupResult。