Python中有没有分布式任务队列可以杀死不合作的挂起任务?

6 投票
3 回答
1794 浏览
提问于 2025-04-17 10:22

基本上,我有很多任务(每批大约1000个),这些任务的执行时间差别很大(从不到一秒到10分钟不等)。我知道如果一个任务执行超过一分钟,我就可以把它终止掉。这些任务是一些数据挖掘模型优化的步骤(但它们之间是独立的),大部分时间都花在某个C语言扩展函数里,所以如果我试图优雅地终止它们,它们是不会配合的。

有没有适合这种情况的分布式任务队列——据我所知,celery允许终止那些愿意配合的任务。但我可能错了。

我最近问过一个类似的问题,关于如何在纯Python中终止挂起的函数,链接在这里:在多线程环境中终止挂起的函数

我想我可以对celery任务进行子类化,让它启动一个新进程,然后执行任务内容,如果执行时间太长就终止它,但这样的话,我可能会被新解释器的初始化开销搞死。

3 个回答

0

当你取消一个celery任务时,可以选择加上一个可选的参数terminate=True

task.revoke(terminate=True)

这可能不完全符合你的需求,因为这个操作不是由任务本身来完成的。不过,你可以考虑扩展任务类,让它能够自己结束,或者设置一个定期清理的任务,去杀掉那些没有按时完成的任务。

0

Pistil 是一个可以管理多个进程的工具,包括可以结束那些不听话的任务。

但是:

  • 它还是测试版软件,虽然它支持的 gunicorn 是个可靠的工具。
  • 我不知道它是怎么处理 1000 个进程的。
  • 进程之间的通信功能还没有,包括的话,你需要自己设置,比如可以用 zeromq

另一种选择是使用 定时信号,这样可以在 36000 秒后引发一个异常。但是如果有其他程序占用了 GIL(全局解释器锁),那么信号就不会被触发,你的 C 程序可能会这样做。

6

Celery 支持 时间限制。你可以用时间限制来终止那些运行时间过长的任务。除了直接终止任务,你还可以使用 软限制,这样可以在任务中处理 SoftTimeLimitExceeded 异常,并且可以优雅地结束任务。

from celery.task import task
from celery.exceptions import SoftTimeLimitExceeded

@task
def mytask():
    try:
        do_work()
    except SoftTimeLimitExceeded:
        clean_up_in_a_hurry()

撰写回答