如何布局队列/工作者结构以支持多个环境的大任务?

12 投票
2 回答
1120 浏览
提问于 2025-04-15 22:25

我们有一个基于Python/Django/Celery的部署工具,具体的设置如下:

  1. 我们现在使用的是默认的Celery设置,也就是一个叫“celery”的队列和交换机。
  2. 队列中的每个任务代表一个部署操作。
  3. 每个环境的任务最后都有一个同步阶段,这个阶段可能会耗费很长时间。

我们需要满足以下要求:

  1. 并发性:多个环境的任务应该可以同时进行。
  2. 锁定:每个环境最多只能有一个任务在运行,也就是说环境之间是互相锁定的。
  3. 吞吐量优化:如果一个环境有多个任务,它们的同步阶段可以合并来优化。所以如果一个任务快要结束了,它应该检查一下这个环境是否有新的任务在队列中等待,如果有,就可以跳过它的同步阶段。

那么,最好的实现方式是什么呢?

一些想法:

  • 我觉得我们需要设置多个队列:每个环境一个队列,并且让个celery工作者专门处理一个队列。这样可以解决第1和第2个要求。
    但是,怎么让多个celery工作者只监听不同的队列呢?
  • 有没有什么简单的方法可以知道某个环境的队列中还有更多任务在等待?

2 个回答

1

我建议你看看zeromq。这个库可以同时处理消息传递和多线程,而且速度非常快。它还支持很多编程语言,并且自带负载均衡的功能。

2

对于1和2,可以使用多个队列,并通过 -Q 参数来指定要监听哪个队列,启动工作者。

另外,设置 CELERYD_PREFETCH_MULTIPLIER = 1,这样每次只处理一个任务。

要获取队列的长度(经过RabbitMQ测试),你可以使用类似下面的代码:

from kombu.connection import BrokerConnection
connection = BrokerConnection(BROKER_HOST, BROKER_USER...)
channel = connection.channel()
q, j, c = channel.queue_declare('celery', passive=True)
print 'celery %d jobs in queue' % j

使用 'queue_declare' 这个操作,可以得到队列的长度。

希望这对你有帮助。

撰写回答