Celery的扇出和后续同步

2 投票
1 回答
622 浏览
提问于 2025-04-18 18:03

我正在用Python(2.x)和Celery做一个(至少我觉得是)简单的分发系统。基本的想法是对一组输入文档运行一些算法的不同变体,然后对输出再用另一个算法做同样的事情(这样重复几次)。简单来说就是这样:

       doc1
        /|\
doc1_1 doc1_2 doc1_3
  /|\  /|\      /|\
doc1_1_2 ....

另外,在这个执行链的某个点能够同步一下会很有帮助。虽然这不是必须的,但这样可以减少整体的执行时间。

我尝试在一个任务中创建和执行多个子任务,但我找不到方法来判断这些子任务是否失败或者是否完成。构建大量的任务链看起来有点不优雅(我是C语言程序员,所以不敢自称权威),但至少我能获取到链中每个任务的当前状态。如果我理解那些比较模糊的文档没错的话,我应该可以通过一个包含所有链的“和弦”来实现同步,但似乎没有办法获取每条链的状态。

我对此感到很困惑,因为我对Celery的设计理念不太熟悉,而文档也很稀少,帮不上忙。那么,怎么做才是“正确的”(或者说最不丑陋的)方式呢?

1 个回答

0

我通过建立很多链,然后把它们放到一个组里执行,得到了我想要的结果。我的代码大致是这样的:

res = []
for sequence in product(*input, *action):
    method = getattr(tasks, sequence[1]['method'])
    ch = chain(method.s(input_document, **(sequence[1])))
    for seq in sequence[2:]:
        method = getattr(tasks, seq['method'])
        ch |= method.s(**seq)
    res.append(ch)
r = group(res).apply_async()
r.save()
return r.id

简单来说,就是计算所有输入文档和操作的点积(算法设置),从任务模块中获取相应的方法,把它们全部串联起来,放到一个组里,然后执行。

另外,从GroupResult中获取链的结果似乎可以正常工作,只要你别忘了保存和恢复GroupResult。

撰写回答