在Redis中使用Celery的任务优先级

16 投票
2 回答
13054 浏览
提问于 2025-04-17 18:05

我想用celery来实现一个分布式的任务执行系统。因为rabbitMQ不支持优先级,而我非常需要这个功能,所以我选择了celery+redis。

在我的情况下,任务和硬件关系很紧密,比如任务A只能在Worker 1上运行,因为只有Worker 1的电脑有必要的硬件。我把每个worker的并发数设置为1,这样每次一个worker只能运行一个任务。每个任务大约需要2分钟。

为了实现优先级功能,首先我尝试在调用apply_async()时添加priority参数,比如apply_async(priority=0)apply_async(priority=9)。在这个测试中,我只启动了一个并发数为1的Worker,并依次启动了10个不同优先级的任务。我原本期待apply_async(priority=0)启动的任务能优先执行,但不幸的是,它们只是按照启动的顺序开始执行。

然后我尝试了一些变通的方法。我克隆了每个任务,所以每个任务都有一个高优先级和一个低优先级,分别用@celery.task(priority=0)@celery.task(priority=1)装饰。然后我做了和之前一样的测试,这次效果稍微好一些,当启动顺序是“HH-LLLL-HHHH”时,实际执行的顺序变成了“HH-L-H-H-L-H-L-L-H”。我猜redis在这里做了一些调度和平衡的工作。

但这仍然不能满足我的期望。我希望能得到像“HHHHHH-LLLL”这样的顺序,因为对于某些任务,我只有一台合适的机器有必要的硬件,希望高优先级的任务能尽快运行。

我在网上搜索了其他的变通方法,比如使用两个队列,一个用于高优先级任务,另一个用于低优先级任务,并为前者使用2台机器,为后者使用1台机器。但由于我的硬件非常有限,这对我来说行不通。

你能给我一些建议吗?

2 个回答

4

关于Celery和Redis消息优先级的文档可以在这里找到:redis-message-priorities,你可以自定义优先级等级。以10为例:

  1. 设置优先级步骤的传输选项
app.conf.broker_transport_options = {
    'priority_steps': list(range(10)),
    'queue_order_strategy': 'priority',
}
  1. 以正常方式启动Celery工作进程
celery -A tasks worker --loglevel=info
  1. 调用任务时,0是最高优先级,9是最低优先级
custom_priority=5 
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'},priority=custom_priority)
22

Celery使用Redis作为传输工具时,会考虑优先级这个字段,但实际上Redis并没有优先级的概念。

优先级的支持是通过为每个队列创建多个列表来实现的,然后在BRPOP命令中使用这个顺序。我这里提到的个列表是因为虽然有10个(0-9)优先级等级,但默认情况下为了节省资源,这些优先级会合并成4个等级。这意味着一个名为celery的队列实际上会被分成4个队列:

['celery0', 'celery3`, `celery6`, `celery9`]

如果你想要更多的优先级等级,可以设置priority_steps这个传输选项:

BROKER_TRANSPORT_OPTIONS = {
    'priority_steps': list(range(10)),
}

不过要注意,这种方式的优先级支持永远无法和服务器级别的优先级相比,最多也只是近似而已。但对于你的应用来说,这可能已经足够用了。

来源。

撰写回答