在启动时添加Celery队列
我想做两件事情:
- 我创建了一个工作步骤,用来从远程加载配置。
- 我想从配置中获取一个参数,并用这个参数来设置消费者队列的名称。
app.steps['worker'].add(LoadConfig)
这个运行得很好。
但是我无法创建一个 SetQueue 的工作步骤。
现在我的 SetQueue 看起来是这样的:
class SetQueue(bootsteps.StartStopStep):
requires = (Consumer, )
def start(self, parent, **kwargs):
parent.add_task_queue('q_name', exchange='q_name', routing_key='q_name')
app.steps['consumer'].add(SetQueue)
但它并不工作。
我觉得我的问题在于我不明白——在什么时刻(requires=(???, ))可以添加队列。
1 个回答
6
在Celery中,有不同的“蓝图”,每个蓝图由多个启动步骤组成。
第一个启动的蓝图是Worker,它负责启动执行池;最后一个启动的蓝图是Consumer
,它连接到消息代理并开始处理任务消费者等。
所以你的requires
并不是指向一个启动步骤,而是指向一个蓝图。
我看到你的启动步骤添加了另一个队列来消费,所以让它依赖于Tasks
启动步骤会更合理,因为如果你想调用add_task_queue
,这个步骤必须在运行中。
class SetQueue(bootsteps.StartStopStep):
requires = ('celery.worker.consumer:Tasks', )
你可以在这里查看启动组件的概述: http://docs.celeryproject.org/en/latest/userguide/extending.html#blueprints
另外,注册了自定义启动步骤后,你可以使用以下命令生成这个图的新版本:
$ celery -A proj graph bootsteps | dot -Tpng -o steps.png