在Celery中,如何避免长时间延迟的任务阻塞新任务?

13 投票
2 回答
6088 浏览
提问于 2025-04-18 06:25

我有两种任务。任务A是由celerybeat每小时生成一次的。它会立即运行,并生成一千个(或者更多)任务B,每个任务B的预计执行时间(ETA)是未来的一天。

当任务A启动时,它会生成一千个任务B。从那以后,就没有任何事情发生。我应该每小时看到另一个任务A在运行,并生成一千个任务B。但实际上我什么也没看到。

在系统冻结的时候,rabbitmqctl显示有1000条消息,其中968条是准备好的,32条是未确认的。一个小时后,消息数量变成1001条,969条准备好,32条未确认。每小时都会有一条新消息被标记为准备好。推测发生的情况是,工作进程预取了32条消息,但无法处理它们,因为它们的ETA仍然在未来。与此同时,应该立即运行的新任务却无法执行。

处理这个问题的正确方法是什么?我猜我需要多个工作进程,也许还需要多个队列(但我对后者不太确定)。有没有更简单的方法?我尝试调整CELERYD_PREFETCH_MULTIPLIER和-Ofail(在这里讨论过:http://celery.readthedocs.org/en/latest/userguide/optimizing.html),但没有成功。我的问题是不是和这个一样:[[Django Celery]] Celery在执行IO任务时被阻塞

无论如何:我之所以能解决这个问题,是因为我对任务的性质和时间安排了解很多。难道不觉得这是一个设计缺陷吗?足够多的未来ETA任务会锁住整个系统?如果我等几个小时,然后杀掉并重启工作进程,它又会抓取前32个任务并冻结,尽管此时队列中有任务是可以立即运行的。难道没有哪个组件足够聪明,可以查看ETA并忽略那些不能运行的任务吗?

附录:我现在认为这个问题是RabbitMQ 3.3与Celery 3.1.0一起使用时的一个已知bug。更多信息在这里: https://groups.google.com/forum/#!searchin/celery-users/countdown|sort:date/celery-users/FiAAESOzezA/499OH-pylacJ

在更新到Celery 3.1.1后,情况似乎有所改善。任务A每小时运行一次(嗯,已经运行了几个小时),并安排它的任务B副本。那些似乎正在填满工作进程:未确认的消息数量继续增加。我得看看它是否会无限增长。

2 个回答

1

你现在有多少个工作线程在运行,处理的并发量是多少呢?

增加工作线程的并发量可能会对解决问题有帮助。如果某个线程(比如线程x)在处理一个耗时很长的任务A,或者处于等待状态,那么其他线程就可以去处理那些提前准备好的任务。

http://celery.readthedocs.io/en/latest/reference/celery.bin.worker.html#cmdoption-celery-worker-c

2

看起来这个问题可以通过路由来解决:

使用路由的时候,你可以有多个队列,每个队列里放不同类型的任务。如果你希望任务B不会影响到任务A的执行,你可以把它们放在不同的工作队列里,并设置不同的优先级。这样,你的工作者就会先处理满是任务B的大队列,但一旦有任务A到来,最近空闲的工作者就会去处理这个任务A。

这样做的好处是,你还可以给那些任务很多的队列分配更多的工作者,这些工作者只会从指定的高负载队列中取任务。

撰写回答