在Redis中使用Celery的任务优先级
我想用celery来实现一个分布式的任务执行系统。因为rabbitMQ不支持优先级,而我非常需要这个功能,所以我选择了celery+redis。
在我的情况下,任务和硬件关系很紧密,比如任务A只能在Worker 1上运行,因为只有Worker 1的电脑有必要的硬件。我把每个worker的并发数设置为1,这样每次一个worker只能运行一个任务。每个任务大约需要2分钟。
为了实现优先级功能,首先我尝试在调用apply_async()
时添加priority
参数,比如apply_async(priority=0)
和apply_async(priority=9)
。在这个测试中,我只启动了一个并发数为1的Worker,并依次启动了10个不同优先级的任务。我原本期待apply_async(priority=0)
启动的任务能优先执行,但不幸的是,它们只是按照启动的顺序开始执行。
然后我尝试了一些变通的方法。我克隆了每个任务,所以每个任务都有一个高优先级和一个低优先级,分别用@celery.task(priority=0)
和@celery.task(priority=1)
装饰。然后我做了和之前一样的测试,这次效果稍微好一些,当启动顺序是“HH-LLLL-HHHH”时,实际执行的顺序变成了“HH-L-H-H-L-H-L-L-H”。我猜redis在这里做了一些调度和平衡的工作。
但这仍然不能满足我的期望。我希望能得到像“HHHHHH-LLLL”这样的顺序,因为对于某些任务,我只有一台合适的机器有必要的硬件,希望高优先级的任务能尽快运行。
我在网上搜索了其他的变通方法,比如使用两个队列,一个用于高优先级任务,另一个用于低优先级任务,并为前者使用2台机器,为后者使用1台机器。但由于我的硬件非常有限,这对我来说行不通。
你能给我一些建议吗?
2 个回答
关于Celery和Redis消息优先级的文档可以在这里找到:redis-message-priorities,你可以自定义优先级等级。以10为例:
- 设置优先级步骤的传输选项
app.conf.broker_transport_options = {
'priority_steps': list(range(10)),
'queue_order_strategy': 'priority',
}
- 以正常方式启动Celery工作进程
celery -A tasks worker --loglevel=info
- 调用任务时,0是最高优先级,9是最低优先级
custom_priority=5
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'},priority=custom_priority)
Celery使用Redis作为传输工具时,会考虑优先级这个字段,但实际上Redis并没有优先级的概念。
优先级的支持是通过为每个队列创建多个列表来实现的,然后在BRPOP命令中使用这个顺序。我这里提到的celery
的队列实际上会被分成4个队列:
['celery0', 'celery3`, `celery6`, `celery9`]
如果你想要更多的优先级等级,可以设置priority_steps
这个传输选项:
BROKER_TRANSPORT_OPTIONS = {
'priority_steps': list(range(10)),
}
不过要注意,这种方式的优先级支持永远无法和服务器级别的优先级相比,最多也只是近似而已。但对于你的应用来说,这可能已经足够用了。