如何在Celery中将任务链路路由到特定队列?
当我把一个任务发送到特定的队列时,它是可以正常工作的:
task.apply_async(queue='beetroot')
但是如果我创建一个任务链:
chain = task | task
然后我写:
chain.apply_async(queue='beetroot')
看起来它忽略了队列的设置,反而把任务分配到了默认的'celery'队列。
如果celery能支持在任务链中进行路由就好了——这样所有任务都能在同一个队列中按顺序执行。
3 个回答
10
这段话有点晚了,但我觉得@mpaf提供的代码并不完全正确。
背景:在我的情况下,我有两个子任务,第一个子任务会返回一个值,这个值会作为输入传递给第二个子任务。我在让第二个任务执行时遇到了问题——我在日志中看到Celery会把第二个任务认作第一个任务的回调,但第二个任务就是不执行。
这是我之前不工作的链式代码 -:
from celery import chain
chain(
module.task1.s(arg),
module.task2.s()
).apply_async(countdown=0.1, queue='queuename')
使用@mpaf回答中提供的语法,我让两个任务都执行了,但执行顺序很乱,第二个子任务没有被认作第一个的回调。于是我决定去查文档,看看怎么明确地为子任务设置队列。
这是我现在能正常工作的代码 -:
chain(
module.task1.s(arg).set(queue='queuename'),
module.task2.s().set(queue='queuename')
).apply_async(countdown=0.1)
21
我这样做:
subtask = task.s(*myargs, **mykwargs).set(queue=myqueue)
mychain = celery.chain(subtask, subtask2, ...)
mychain.apply_async()
12
好的,我搞明白这个问题了。
你需要在子任务的定义中添加一些必要的执行选项,比如 queue= 或 countdown=,或者通过一个部分来添加:
子任务定义:
from celery import subtask
chain = subtask('task', queue = 'beetroot') | subtask('task', queue = 'beetroot')
部分:
chain = task.s().apply_async(queue = 'beetroot') | task.s().apply_async(queue = 'beetroot')
然后你可以通过以下方式执行这个链:
chain.apply_async()
或者,
chain.delay()
这样任务就会被发送到“beetroot”队列中。最后这个命令中的额外执行参数不会起作用。如果能在链(或者组,或者其他任何画布元素)层面应用这些执行参数,那就太好了。