在启动时添加Celery队列

1 投票
1 回答
1833 浏览
提问于 2025-04-18 10:38

我想做两件事情:

  1. 我创建了一个工作步骤,用来从远程加载配置。
  2. 我想从配置中获取一个参数,并用这个参数来设置消费者队列的名称。

app.steps['worker'].add(LoadConfig)

这个运行得很好。

但是我无法创建一个 SetQueue 的工作步骤。

现在我的 SetQueue 看起来是这样的:

class SetQueue(bootsteps.StartStopStep):

    requires = (Consumer, )

    def start(self, parent, **kwargs):

        parent.add_task_queue('q_name', exchange='q_name', routing_key='q_name')

app.steps['consumer'].add(SetQueue)

但它并不工作。

我觉得我的问题在于我不明白——在什么时刻(requires=(???, ))可以添加队列。

1 个回答

6

在Celery中,有不同的“蓝图”,每个蓝图由多个启动步骤组成。

第一个启动的蓝图是Worker,它负责启动执行池;最后一个启动的蓝图是Consumer,它连接到消息代理并开始处理任务消费者等。

所以你的requires并不是指向一个启动步骤,而是指向一个蓝图。

我看到你的启动步骤添加了另一个队列来消费,所以让它依赖于Tasks启动步骤会更合理,因为如果你想调用add_task_queue,这个步骤必须在运行中。

class SetQueue(bootsteps.StartStopStep):
    requires = ('celery.worker.consumer:Tasks', )

你可以在这里查看启动组件的概述: http://docs.celeryproject.org/en/latest/userguide/extending.html#blueprints

另外,注册了自定义启动步骤后,你可以使用以下命令生成这个图的新版本:

$ celery -A proj graph bootsteps | dot -Tpng -o steps.png

撰写回答