Django/Celery 本地多队列 - 路由不工作

14 投票
2 回答
13153 浏览
提问于 2025-04-18 03:06

我按照celery的文档在我的开发机器上定义了两个队列。

我的celery设置如下:

CELERY_ALWAYS_EAGER = True
CELERY_TASK_RESULT_EXPIRES = 60  # 1 mins
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_CREATE_MISSING_QUEUES = True
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='arena.social.tasks.#'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'fs_feeds',
    },
}

我在项目的虚拟环境中打开了两个终端窗口,并运行了以下命令:

terminal_1$ celery -A arena worker -Q default -B -l debug --purge -n deafult_worker
terminal_2$ celery -A arena worker -Q feeds -B -l debug --purge -n feeds_worker

结果是所有任务都被两个队列处理了。

我的目标是让一个队列只处理在CELERY_ROUTES中定义的那个任务,而默认队列处理其他所有任务。

我还参考了这个StackOverflow的问题,运行rabbitmqctl list_queues返回celery 0,而运行rabbitmqctl list_bindings返回exchange celery queue celery []两次。重启rabbit服务器也没有改变任何情况。

2 个回答

3

除了已经被接受的答案,如果有人来到这里,还是在想为什么他的设置没有生效(就像我刚才一样),那么这里有个原因:celery的文档没有正确列出设置的名称。

对于celery 5.0.5的设置,CELERY_DEFAULT_QUEUECELERY_QUEUESCELERY_ROUTES应该改成CELERY_TASK_DEFAULT_QUEUECELERY_TASK_QUEUESCELERY_TASK_ROUTES。这些是我测试过的设置,但我猜同样的规则也适用于交换和路由键。

28

好的,我搞明白了。以下是我的整个设置、配置以及如何运行celery,供那些可能对我的问题有同样疑问的人参考。

设置

CELERY_TIMEZONE = TIME_ZONE
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1

# celery queues setup
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='long_tasks'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'feeds',
        'routing_key': 'long_tasks',
    },
}

如何运行celery?

终端 - 标签 1:

celery -A proj worker -Q default -l debug -n default_worker

这将启动第一个工作进程,它会从默认队列中获取任务。注意!-n default_worker 对于第一个工作进程不是必须的,但如果你有其他celery实例在运行,这个参数就是必须的。设置 -n worker_name--hostname=default@%h 是一样的。

终端 - 标签 2:

celery -A proj worker -Q feeds -l debug -n feeds_worker

这将启动第二个工作进程,它会从feeds队列中获取任务。注意 -n feeds_worker,如果你使用 -l debug(日志级别 = debug),你会看到两个工作进程之间在同步。

终端 - 标签 3:

celery -A proj beat -l debug

这将启动beat,根据你在 CELERYBEAT_SCHEDULE 中的计划执行任务。我不需要更改任务或 CELERYBEAT_SCHEDULE

例如,这就是我为应该进入feeds队列的任务设置的 CELERYBEAT_SCHEDULE

CELERYBEAT_SCHEDULE = {
    ...
    'update_feeds': {
        'task': 'arena.social.tasks.Update',
        'schedule': crontab(minute='*/6'),
    },
    ...
}

如你所见,不需要添加 'options': {'routing_key': 'long_tasks'} 或指定任务应该去哪个队列。此外,如果你在想为什么 Update 是大写的,那是因为它是一个自定义任务,定义为 celery.Task 的子类。

更新 Celery 5.0+

自从5.0版本以来,Celery做了一些更改,这里是任务路由的更新设置。

如何创建队列?

Celery可以自动创建队列。对于简单的情况,它的默认路由值工作得很好。

task_create_missing_queues=True,或者如果你使用django设置并将所有celery配置命名空间放在 CELERY_ 关键字下,使用 CELERY_TASK_CREATE_MISSING_QUEUES=True。注意,这个选项默认是开启的。

自动调度任务路由

在配置celery应用后:

celery_app.conf.beat_schedule = {
  "some_scheduled_task": {
    "task": "module.path.some_task",
    "schedule": crontab(minute="*/10"),
    "options": {"queue": "queue1"}
  }
}

自动任务路由

Celery应用仍然需要先配置,然后:

app.conf.task_routes = {
  "module.path.task2": {"queue": "queue2"},
}

手动任务路由

如果你想动态路由任务,那么在发送任务时指定队列:

from module import task

def do_work():
  # do some work and launch the task
  task.apply_async(args=(arg1, arg2), queue="queue3")

关于路由的更多细节可以在这里找到: https://docs.celeryproject.org/en/stable/userguide/routing.html

关于调用任务的内容可以在这里找到: https://docs.celeryproject.org/en/stable/userguide/calling.html

撰写回答