通知芹菜任务关闭工人

2024-05-17 15:23:02 发布

您现在位置:Python中文网/ 问答频道 /正文

我将芹菜2.4.1与python 2.6、rabbitmq后端和django一起使用。如果工人停工,我希望我的任务能够正常地清理。据我所知,您不能提供任务析构函数,所以我尝试连接到worker_shutdown信号。

注意:AbortableTask只适用于数据库后端,所以我不能使用它。

from celery.signals import worker_shutdown

@task
def mytask(*args)

  obj = DoStuff()

  def shutdown_hook(*args):
     print "Worker shutting down"
     # cleanup nicely
     obj.stop()

  worker_shutdown.connect(shutdown_hook)

  # blocking call that monitors a network connection
  obj.stuff()

然而,关闭钩子永远不会被调用。Ctrl-C'ing的工人不会杀死任务,我必须手动从外壳杀死它。

因此,如果这不是正确的方法,我如何允许任务正常关闭?


Tags: django函数from数据库obj信号defrabbitmq
1条回答
网友
1楼 · 发布于 2024-05-17 15:23:02

worker_shutdown只由MainProcess发送,而不是由子池工作进程发送。 所有worker_*信号except for worker_process_init,参考MainProcess

However, the shutdown hook never gets called. Ctrl-C'ing the worker doesn't kill the task and I have to manually kill it from the shell.

在正常(热)关机状态下,工作程序从不终止任务。 即使一个任务需要几天才能完成,工人也不会完成关闭 直到它完成。您可以将--soft-time-limit,或--time-limit设置为 告诉实例何时可以终止任务。

因此,要添加任何类型的进程清理进程,首先需要 确保任务能够真正完成。因为清理工作不会 在那之前打电话。

要将清理步骤添加到池工作进程,可以使用 类似于:

from celery import platforms
from celery.signals import worker_process_init

def cleanup_after_tasks(signum, frame):
    # reentrant code here (see http://docs.python.org/library/signal.html)

def install_pool_process_sighandlers(**kwargs):
    platforms.signals["TERM"] = cleanup_after_tasks
    platforms.signals["INT"] = cleanup_after_tasks

worker_process_init.connect(install_pool_process_sighandlers)

相关问题 更多 >