通知Celery任务工作进程关闭

10 投票
2 回答
7818 浏览
提问于 2025-04-17 06:22

我正在使用celery 2.4.1,搭配python 2.6,后端是rabbitmq,还有django。我希望我的任务在工作进程关闭时能够正确清理。根据我所知道的,无法提供任务的清理函数,所以我尝试连接到worker_shutdown信号。

注意:AbortableTask 只适用于数据库后端,所以我不能使用它。

from celery.signals import worker_shutdown

@task
def mytask(*args)

  obj = DoStuff()

  def shutdown_hook(*args):
     print "Worker shutting down"
     # cleanup nicely
     obj.stop()

  worker_shutdown.connect(shutdown_hook)

  # blocking call that monitors a network connection
  obj.stuff()

但是,关闭钩子从来没有被调用。按Ctrl-C关闭工作进程并不会终止任务,我必须手动从命令行中杀掉它。

所以如果这不是正确的方法,我该如何让任务优雅地关闭呢?

2 个回答

0

使用 worker_shutting_down 信号。
可以查看这个链接了解更多信息:https://docs.celeryq.dev/en/stable/userguide/signals.html#worker-shutting-down,还有这个链接提供的例子:https://stackoverflow.com/a/55481656/237091

11

worker_shutdown 这个信号是由 MainProcess 发出的,而不是由子进程的工作者发出的。所有以 worker_* 开头的信号,除了 worker_process_init,都是指 MainProcess

不过,关闭的钩子从来没有被调用。按 Ctrl-C 结束工作者并不会杀掉任务,我必须手动在命令行中结束它。

在正常的(温和的)关闭情况下,工作者不会终止任务。即使一个任务需要几天才能完成,工作者也不会结束关闭,直到这个任务完成为止。你可以设置 --soft-time-limit--time-limit 来告诉系统什么时候可以结束这个任务。

所以,要添加任何清理过程,你首先需要确保任务能够真正完成。因为在任务完成之前,清理过程是不会被调用的。

要为工作者进程添加清理步骤,你可以使用类似下面的代码:

from celery import platforms
from celery.signals import worker_process_init

def cleanup_after_tasks(signum, frame):
    # reentrant code here (see http://docs.python.org/library/signal.html)

def install_pool_process_sighandlers(**kwargs):
    platforms.signals["TERM"] = cleanup_after_tasks
    platforms.signals["INT"] = cleanup_after_tasks

worker_process_init.connect(install_pool_process_sighandlers)

撰写回答