使用动态routing_key启动工作进程?

1 投票
1 回答
1144 浏览
提问于 2025-04-20 10:06

我有一个队列,里面有几种不同的任务类型,我需要为特定的任务运行工作程序。就像这样:'celery worker --routing_key task.type1 --app=app'

队列配置:

CELERY_QUEUES = (
    Queue('myqueue',    routing_key='task.#'),
)
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'

使用pika来处理这个任务很简单,可以参考这个链接:http://www.rabbitmq.com/tutorials/tutorial-five-python.html,但是用celery该怎么做呢?

1 个回答

3

没问题,你不能把一个工作者绑定到一个路由键上。

  • 工作者是从队列中获取消息,而不是从路由键中。
  • 生产者会用路由键发送消息,RabbitMQ会把这些消息路由到队列里。

用pika也是不行的。

在教程中,工作者/消费者会把自己的队列绑定到路由键上。

  1. 生产者发出带有路由键为'info'的日志。
  2. RabbitMQ会丢弃所有这些日志,直到有一个队列绑定到这个路由键上。
  3. 接收者创建一个队列A,并把它绑定到路由键'info'。
  4. 现在RabbitMQ会把路由键为'info'的日志发送到队列A,接收者就可以获取这些日志了。

你可以很容易地用celery来实现这个绑定。

举个例子,你可以在celery的配置文件中这样做:

 exchange = Exchange('default', type=topic)
 CELERY_QUEUES = (
   Queue('all_logs', exchange, routing_key='logs.#'),
   Queue('info_logs', exchange, routing_key='logs.info')
  )

接收所有日志:

$ celery worker -A receive_logs -Q all_logs

只接收'info'日志(也就是路由键为logs.info的消息)

$ celery worker -A receive_logs -Q info_logs

最后,你启动了一个只处理特定路由键消息的工作者,这正是你想要的。

注意:info日志在两个队列中都是重复的:Queue:all_logs 和 Queue:info_logs。

你可能会对这个链接感兴趣: http://docs.celeryproject.org/en/latest/configuration.html?highlight=direct#std:setting-CELERY_WORKER_DIRECT

撰写回答