Django celery在完成前一个实例的设置持续时间后开始定期任务

2024-04-29 15:58:34 发布

您现在位置:Python中文网/ 问答频道 /正文

目前我有一个周期性的任务,每30分钟运行一次。任务本身有时需要30多分钟才能完成。在

如何更改定期任务在上次运行完成后30分钟运行?在

@periodic_task(run_every=timedelta(minutes=30), queue='activities', options={'queue': 'activities'})
def pull_activities_frequent_adaptors():
    adaptors_queryset = TrackingAppAdaptor.objects.adaptors_that_pull_activities_frequently()
    pull_activities_from_adaptors(adaptors_queryset)

Tags: runtaskqueuedefpullactivitiestimedeltaoptions
2条回答

芹菜不支持这种开箱即用的方法,但我以前也做过类似的事情,我不得不自己摸索出一个解决方案。在

根据我的经验,有两种简单明了的方法来实现这一点,两者都需要权衡。也有一些相当大的漏洞,你可以踏进这个东西,所以警告掏空者。在

选项1:

使用一些数据存储来保存有关何时运行任务的信息,并触发celery beat任务。在

要做到这一点,您可以使用您的数据库和一个模型,其中包含有关周期性任务的一些信息。(如果你想获得更多的技术支持,你甚至可以直接与队列对话,也可以跳过模特路线。)

from django.db import models

class PeriodicTask(models.Model):
   lastrun = models.DateTimeField()
   nextrun = models.DateTimeField()
   notes = models.TextField()  # errors?
   task_id = models.CharField(max_length=100)

这只是对模型可能存储的内容有点粗略的了解。您可以将任何有用的东西放在那里,但是您需要一个datetime对象来存储下一次运行的时间。在

接下来,您的定期任务需要更频繁地运行,以便启动并查看是否有任何任务需要尽快执行:

^{pr2}$

潜在问题:

1)如果您有多个具有主从设置的数据库,特别是如果您有滞后,您可能会导致双重调度(即使使用count() == 1部分)。因此,有一个值得思考的种族条件。在

2)很难接近30分钟,因为您必须使用时间窗口来查找要执行的任务。在

3)任务需要比您的时间窗口更频繁地运行,否则您可能会错过它。这可能是对资源的浪费(但我认为这并不是太可怕),因为它通常都在运转,什么也不做。在

4)没有什么比处理日期时间更让人困惑的了,所以你必须真正考虑时区的问题,考虑所有的变化,并测试这段代码。在

5)这是一个很大的问题:如果任务的运行时间比计划的时间间隔长,那么您将有两个任务同时运行,这是一个问题。同样,在比赛条件下,事情会变得很危险。在

选项2)

不要用芹菜打:先完成第一个任务,30分钟后再执行另一个任务。这有可能成为一个出走的巫师学徒类型的东西,所以我觉得有点,嗯,吓人,虽然我已经做了第一个选择,但我从来没有真正说服自己进入以下。但是,不管怎样,我认为这是可以做到的:

@task  # no longer a periodic task
def your_task(args):
    # Whatever you want to do, then call itself again...
    your_task.apply_async(args=(args), countdown=1800)  

现在你只需要在某个地方调用它,可能是在一个每周启动一次的cron作业中,它会杀死这个东西以前的任何版本(它是如何找到它们的?)然后发射第一个。在

我不得不说,我真的不喜欢这个答案,尽管我曾经有过几次这样的经历,但它似乎是一种更危险、更难以控制的解决问题的方法。不过,我很好奇是否有人这么做。在

对不起,你不能这么做。在

由于有多个worker执行该作业,因此您实际上需要确保没有工人正在运行上一个任务。在

你能做什么:

  1. 使用celery backend可以监视是否仍有任务正在进行,而不执行当前任务。这将允许您创建一个只有一个任务被执行的情况。

  2. 当任务完成后,您可以创建一个触发器来发送等待30分钟的新任务,您可以使用ETA

相关问题 更多 >