如何在Celery中将任务链路路由到特定队列?

20 投票
3 回答
13041 浏览
提问于 2025-04-17 16:20

当我把一个任务发送到特定的队列时,它是可以正常工作的:

task.apply_async(queue='beetroot')

但是如果我创建一个任务链:

chain = task | task

然后我写:

chain.apply_async(queue='beetroot')

看起来它忽略了队列的设置,反而把任务分配到了默认的'celery'队列。

如果celery能支持在任务链中进行路由就好了——这样所有任务都能在同一个队列中按顺序执行。

3 个回答

10

这段话有点晚了,但我觉得@mpaf提供的代码并不完全正确。

背景:在我的情况下,我有两个子任务,第一个子任务会返回一个值,这个值会作为输入传递给第二个子任务。我在让第二个任务执行时遇到了问题——我在日志中看到Celery会把第二个任务认作第一个任务的回调,但第二个任务就是不执行。

这是我之前不工作的链式代码 -:

from celery import chain

chain(
    module.task1.s(arg),
    module.task2.s()
).apply_async(countdown=0.1, queue='queuename')

使用@mpaf回答中提供的语法,我让两个任务都执行了,但执行顺序很乱,第二个子任务没有被认作第一个的回调。于是我决定去查文档,看看怎么明确地为子任务设置队列。

这是我现在能正常工作的代码 -:

chain(
    module.task1.s(arg).set(queue='queuename'),
    module.task2.s().set(queue='queuename')
).apply_async(countdown=0.1)
21

我这样做:

subtask = task.s(*myargs, **mykwargs).set(queue=myqueue)
mychain = celery.chain(subtask, subtask2, ...)
mychain.apply_async()
12

好的,我搞明白这个问题了。

你需要在子任务的定义中添加一些必要的执行选项,比如 queue= 或 countdown=,或者通过一个部分来添加:

子任务定义:

from celery import subtask

chain = subtask('task', queue = 'beetroot') | subtask('task', queue = 'beetroot')

部分:

chain = task.s().apply_async(queue = 'beetroot') | task.s().apply_async(queue = 'beetroot')

然后你可以通过以下方式执行这个链:

chain.apply_async()

或者,

chain.delay()

这样任务就会被发送到“beetroot”队列中。最后这个命令中的额外执行参数不会起作用。如果能在链(或者组,或者其他任何画布元素)层面应用这些执行参数,那就太好了。

撰写回答