Celery:如何获取任务最后运行时间?
使用celery时,能否在一个任务内部检查这个任务上次运行的时间呢?
我想实现一个功能,就是“抓取自上次运行以来的所有内容”。我可以自己记录上次运行的时间戳,或者希望能从celery那里获取这个信息。
这是在一个django项目中,如果这有影响的话。
3 个回答
2
在最新版本的celery(5.1.2)中,可以通过这段代码获取一个任务的最后运行时间。
#tasks.py
from django_celery_beat.models import PeriodicTask
@shared_task(bind=True)
def backup_everyday(self)
try:
last_run = PeriodicTask.objects.get(task=self.name).last_run_at
except:
last_run = None
“backup everyday”只是一个示例函数。PeriodicTask模型有一个叫“last_run_at”的属性,用来存储任务上一次执行的时间。
2
如果你在使用django-celery,这里的last_run会保存在模型里。你可以用普通的django ORM来根据你的任务名称过滤PeriodicTask模型。
from djcelery.models import PeriodicTask
last_run = PeriodicTask.objects.only('last_run_at').get(task='TASK_NAME_HERE').last_run_at
3
默认情况下,Celery并没有永久保存任务结果的功能。如果你在使用beat模式,系统会自动运行一些清理过程来清理任务结果和执行信息。
我建议你使用NoSQL数据库来存储每次任务的最后执行日期。你可以通过在你的任务中重写after_return这个方法来实现。
def after_return(self, status, retval, task_id, args, kwargs, einfo):
#exit point for context managers
self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo)
这个方法会在每次任务结束时被调用,无论结果是什么。你可以通过检查任务的状态,只在任务成功完成时保存日期,或者根据你的需求实现最合适的行为。