使用CELERY_ROUTES的celery任务路由不适用于任务子类

2 投票
1 回答
3929 浏览
提问于 2025-04-18 07:49

我正在使用 celery 3.0.20,但在让 celery 任务与不同的队列一起工作时遇到了困难。

我的 celery 工作进程是通过以下队列配置启动的:

-Q:w1 default -Q:w2 longrunning

在 Django 的设置中包含了以下 celery 配置:

CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('longrunning', Exchange('longrunning'), routing_key='longrunning'),
)
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_DEFAULT_ROUTING_KEY = 'default'

CELERY_ROUTES = ({'mytasks.task_a.run': {
                    'queue': 'longrunning',
                    'routing_key': 'longrunning'
             }},

到目前为止,一切正常。所有任务默认都放在 default 队列中,而 task_a 则放在长时间运行的队列里。

task_a 模块的实现如下:

from celery import task

@task
def run():
    # do some task work

现在,我遇到的问题是,另一个任务是作为一个类实现的,这个类继承自 celery.Task

from celery import Task

class AnotherTask(Task):

    def run(self, *args, **kwargs):
        # do some task work

AnotherTask 类现在位于 task_b 模块时,我无法让这个任务在 longrunning 队列中执行:我尝试以不同的方式将其添加到 CELERY_ROUTES 中,但都没有成功:

{'mytasks.task_b': {
    'queue': 'longrunning',
     'routing_key': 'longrunning'
}}

-

{'mytasks.task_b.AnotherTask': {
    'queue': 'longrunning',
     'routing_key': 'longrunning'
}}

-

{'mytasks.task_b.AnotherTask.run': {
    'queue': 'longrunning',
     'routing_key': 'longrunning'
}}

我还尝试切换到默认的交换类型 'topic',但这也没有奏效。

有没有什么提示可以让我在 longrunning 队列中执行 AnotherTask 类的任务?

1 个回答

1

好的,我找到了问题所在。

问题出在所有其他任务都是通过 celerybeat 定期启动的,这些任务会进入 longrunning 队列。而新的任务是从网页环境启动的。结果是,celerybeat 的设置导入了包含 CELERY_ROUTES 字典的设置,但网页环境的设置没有导入。真让人头疼!

撰写回答