使用动态routing_key启动工作进程?
我有一个队列,里面有几种不同的任务类型,我需要为特定的任务运行工作程序。就像这样:'celery worker --routing_key task.type1 --app=app'
队列配置:
CELERY_QUEUES = (
Queue('myqueue', routing_key='task.#'),
)
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
使用pika来处理这个任务很简单,可以参考这个链接:http://www.rabbitmq.com/tutorials/tutorial-five-python.html,但是用celery该怎么做呢?
1 个回答
3
没问题,你不能把一个工作者绑定到一个路由键上。
- 工作者是从队列中获取消息,而不是从路由键中。
- 生产者会用路由键发送消息,RabbitMQ会把这些消息路由到队列里。
用pika也是不行的。
在教程中,工作者/消费者会把自己的队列绑定到路由键上。
- 生产者发出带有路由键为'info'的日志。
- RabbitMQ会丢弃所有这些日志,直到有一个队列绑定到这个路由键上。
- 接收者创建一个队列A,并把它绑定到路由键'info'。
- 现在RabbitMQ会把路由键为'info'的日志发送到队列A,接收者就可以获取这些日志了。
你可以很容易地用celery来实现这个绑定。
举个例子,你可以在celery的配置文件中这样做:
exchange = Exchange('default', type=topic)
CELERY_QUEUES = (
Queue('all_logs', exchange, routing_key='logs.#'),
Queue('info_logs', exchange, routing_key='logs.info')
)
接收所有日志:
$ celery worker -A receive_logs -Q all_logs
只接收'info'日志(也就是路由键为logs.info的消息)
$ celery worker -A receive_logs -Q info_logs
最后,你启动了一个只处理特定路由键消息的工作者,这正是你想要的。
注意:info日志在两个队列中都是重复的:Queue:all_logs 和 Queue:info_logs。
你可能会对这个链接感兴趣: http://docs.celeryproject.org/en/latest/configuration.html?highlight=direct#std:setting-CELERY_WORKER_DIRECT