Python芹菜将任务路由到特定RabbitMQ队列时的奇怪行为

2024-04-26 14:37:02 发布

您现在位置:Python中文网/ 问答频道 /正文

我为此绞尽脑汁了一段时间,但还是不知道问题出在哪里。我正在尝试将不同的任务路由到它们自己的特定队列。在RabbitMQ中,我有默认的celery队列和另一个名为test_queue的队列。我可以很好地将任务推入celery,但是我很难弄清楚如何将任务推入test_queue队列。我已经阅读了芹菜设置task_default_queuetask_queuetask_routes的文档。你知道吗

celery.py

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('celery_test',
    broker_url='amqp://',
    backend='amqp://',
    include=['celery_test.tasks'],
    worker_max_tasks_per_child=1,
    task_create_missing_queues=True,
    #task_queues = {'test_queue': {'exchange': 'test_queue', 'routing_key': 'test_queue'}},
    #task_routes = {'celery_test.tasks.test': {'queue': 'test_queue'}}
    #task_default_queue='test_queue'
    )

# Optional configuration, see the application user guide.
app.conf.update(result_expires=3600,)
app.conf.task_routes = {'celery_test.tasks.test': {'queue': 'test_queue'}}

if __name__ == '__main__':
    app.start()

我注意到,当我在Celery构造函数中指定test_queue队列名称时,我的任务没有正确地路由到正确的RabbitMQ队列。相反,我不得不使用app.conf.task_routes = {'celery_test.tasks.test': {'queue': 'test_queue'}}来完成工作。我知道这看起来很琐碎,但我已经为此绞尽脑汁数小时了,我不明白为什么一种方法有效,而另一种方法无效。你知道吗

这是我的芹菜配置.py你知道吗

    broker_url="amqp://"
    result_backend="amqp://"
    include=["tasks"]
    task_acks_late=True
    task_default_rate_limit="150/m"
    task_time_limit=300
    worker_prefetch_multiplier=1
    worker_max_tasks_per_child=2
    task_routes={'tasks.test': {'queue': 'test_queue'}}

我正在使用

  • 4.3.0(大黄)
  • Python 3.5.2版

Tags: testimportappdefault路由amqptask队列
1条回答
网友
1楼 · 发布于 2024-04-26 14:37:02

据我所知,Celery4.x(class)构造函数不接受您提到的那些参数(task\u queues、task\u routes和task\u default\u queue)。你知道吗

相关问题 更多 >