如何设置Celery在运行任务前调用自定义初始化函数?

23 投票
1 回答
13381 浏览
提问于 2025-04-15 18:25

我有一个Django项目,想用Celery来提交任务进行后台处理(http://ask.github.com/celery/introduction.html)。Celery和Django配合得很好,我已经能够提交自定义任务并获取结果了。

不过,我遇到的唯一问题是找不到一个合理的方法来在后台进程中进行自定义初始化。我需要调用一个消耗内存很大的函数,在开始处理任务之前加载很多数据,但我不想每次都调用这个函数。

有没有人遇到过这个问题?有没有什么办法可以解决,而不需要修改Celery的源代码?

谢谢

1 个回答

22

你可以选择自己写一个加载器,或者使用信号。

加载器有一个叫做 on_task_init 的方法,这个方法会在任务即将执行时被调用,还有一个 on_worker_init 方法,它是由 celery 和 celerybeat 的主进程调用的。

使用信号可能是最简单的,下面是可用的信号:

0.8.x 版本:

  • task_prerun(task_id, task, args, kwargs)

    当一个任务即将被工作进程执行时会触发这个信号(如果使用 apply 或者设置了 CELERY_ALWAYS_EAGER,那么就是在本地执行)。

  • task_postrun(task_id, task, args, kwargs, retval)

    在任务执行完毕后触发,条件和上面一样。

  • task_sent(task_id, task, args, kwargs, eta, taskset)

    当一个任务被提交时会调用这个信号(不适合长时间运行的操作)。

在 0.9.x 版本中(当前 GitHub 的主分支)还有额外的信号:

  • worker_init()

    当 celeryd 启动时会调用这个信号(在任务初始化之前,所以如果在支持 fork 的系统上,任何内存的变化都会被复制到子工作进程中)。

  • worker_ready()

    当 celeryd 准备好接收任务时会调用这个信号。

  • worker_shutdown()

    当 celeryd 正在关闭时会调用这个信号。

下面是一个示例,展示如何在任务第一次运行时预先计算一些东西:

from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun

_precalc_table = {}

class PowersOfTwo(Task):

    def run(self, x):
        if x in _precalc_table:
            return _precalc_table[x]
        else:
            return x ** 2
tasks.register(PowersOfTwo)


def _precalc_numbers(**kwargs):
    if not _precalc_table: # it's empty, so haven't been generated yet
        for i in range(1024):
            _precalc_table[i] = i ** 2


# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])

如果你想让这个函数对所有任务都执行,只需跳过 sender 参数。

撰写回答