通知Celery任务工作进程关闭
我正在使用celery 2.4.1,搭配python 2.6,后端是rabbitmq,还有django。我希望我的任务在工作进程关闭时能够正确清理。根据我所知道的,无法提供任务的清理函数,所以我尝试连接到worker_shutdown信号。
注意:AbortableTask 只适用于数据库后端,所以我不能使用它。
from celery.signals import worker_shutdown
@task
def mytask(*args)
obj = DoStuff()
def shutdown_hook(*args):
print "Worker shutting down"
# cleanup nicely
obj.stop()
worker_shutdown.connect(shutdown_hook)
# blocking call that monitors a network connection
obj.stuff()
但是,关闭钩子从来没有被调用。按Ctrl-C关闭工作进程并不会终止任务,我必须手动从命令行中杀掉它。
所以如果这不是正确的方法,我该如何让任务优雅地关闭呢?
2 个回答
使用 worker_shutting_down 信号。
可以查看这个链接了解更多信息:https://docs.celeryq.dev/en/stable/userguide/signals.html#worker-shutting-down,还有这个链接提供的例子:https://stackoverflow.com/a/55481656/237091。
worker_shutdown
这个信号是由 MainProcess
发出的,而不是由子进程的工作者发出的。所有以 worker_*
开头的信号,除了 worker_process_init
,都是指 MainProcess
。
不过,关闭的钩子从来没有被调用。按 Ctrl-C 结束工作者并不会杀掉任务,我必须手动在命令行中结束它。
在正常的(温和的)关闭情况下,工作者不会终止任务。即使一个任务需要几天才能完成,工作者也不会结束关闭,直到这个任务完成为止。你可以设置 --soft-time-limit
或 --time-limit
来告诉系统什么时候可以结束这个任务。
所以,要添加任何清理过程,你首先需要确保任务能够真正完成。因为在任务完成之前,清理过程是不会被调用的。
要为工作者进程添加清理步骤,你可以使用类似下面的代码:
from celery import platforms
from celery.signals import worker_process_init
def cleanup_after_tasks(signum, frame):
# reentrant code here (see http://docs.python.org/library/signal.html)
def install_pool_process_sighandlers(**kwargs):
platforms.signals["TERM"] = cleanup_after_tasks
platforms.signals["INT"] = cleanup_after_tasks
worker_process_init.connect(install_pool_process_sighandlers)