<p>假设您有以下任务:</p>
<pre><code>celery = Celery(
broker="amqp://test:test@localhost:5672/test"
)
celery.conf.update(
CELERY_RESULT_BACKEND = "mongodb",
)
@celery.task
def task_a(result):
print 'task_a:', result
return result
@celery.task
def task_b(result):
print 'task_b:', result
return result
@celery.task
def task_c(result):
print 'task_c:', result
return result
@celery.task
def notify_user(result):
print result
return result
</code></pre>
<p>对于给定的输入数据(绘制时):</p>
^{pr2}$
<p>您可以:</p>
<pre><code> a_group = []
for ia, a in enumerate(tree):
print "A%s:" % ia
b_group = []
for ib, b in enumerate(a):
print " - B%s:" % ib
for c in b:
print ' -', c
c_group = group([task_c.s(c) for c in b])
b_group.append(c_group | task_b.s())
a_group.append(group(b_group) | task_a.s())
final_task = group(a_group) | notify_user.s()
</code></pre>
<p>它的表现是(别看,很难看:)</p>
<pre><code>[[[__main__.task_c('C1'), __main__.task_c('C2'), __main__.task_c('C3')] | __main__.task_b(), [__main__.task_c('C4'), __main__.task_c('C5')] | __main__.task_b()] | __main__.task_a(), [[__main__.task_c('C6'), __main__.task_c('C7'), __main__.task_c('C8')] | __main__.task_b(), [__main__.task_c('C9')] | __main__.task_b()] | __main__.task_a()] | __main__.notify_user()
</code></pre>
<p>传递给notify_user的数据将是:</p>
<pre><code>[[['C1', 'C2', 'C3'], ['C4', 'C5']], [['C6', 'C7', 'C8'], ['C9']]]
</code></pre>
<p>一切都是通过回调(和弦)运行的,因此没有任务等待其他任务。在</p>