Django Celery: 如何以编程方式设置任务在特定间隔运行

7 投票
4 回答
6876 浏览
提问于 2025-04-17 00:15

我发现可以在特定的时间间隔内设置任务运行,具体可以参考这里,不过那是在任务声明的时候做的。那我该怎么动态地设置一个任务定期运行呢?

4 个回答

0

可以查看这里 http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html

我觉得你不能动态地创建任务……最好的办法是创建一个任务来执行另一个任务 :D

比如说,你想在 X 秒后运行某个东西,那你就创建一个新的任务,设置延迟为 X 秒,然后在这个任务里再创建一个延迟为 N*X 秒的任务……

2

celery.task.base.PeriodicTask 这个东西里面有个叫 is_due 的功能,它用来判断下一次任务什么时候该执行。你可以自己改写这个功能,加入你想要的动态运行逻辑。想了解更多,可以查看这里的文档:http://docs.celeryproject.org/en/latest/reference/celery.task.base.html?highlight=is_due#celery.task.base.PeriodicTask.is_due

举个例子:

import random
from celery.task import PeriodicTask

class MyTask(PeriodicTask):

    def run(self, **kwargs):
        logger = self.get_logger(**kwargs)
        logger.info("Running my task")

    def is_due(self, last_run_at):
        # Add your logic for when to run. Mine is random
        if random.random() < 0.5:
            # Run now and ask again in a minute
            return (True, 60)
        else:
            # Don't run now but run in 10 secs
            return (True, 10)
8

这个调度是根据一个设置来的,所以在运行时看起来是不能改变的。

你可能可以通过使用 任务的预计时间(ETA) 来实现你想要的功能。这可以保证你的任务不会在想要的时间之前运行,但并不保证任务会在指定的时间运行——如果工作者在那个时间点忙不过来,任务可能会延迟执行。

如果这个限制对你来说不是问题,你可以写一个任务,先自己运行一下,像这样:

@task
def mytask():
    keep_running = # Boolean, should the task keep running?
    if keep_running:
        run_again = # calculate when to run again
        mytask.apply_async(eta=run_again)
    # ... do the stuff you came here to do ...

这种方法的一个主要缺点是,你依赖任务存储来记住正在进行的任务。如果其中一个任务在启动下一个任务之前失败了,那么这个任务就再也不会运行了。如果你的消息代理没有保存到硬盘上而崩溃了(导致所有正在进行的任务都消失),那么那些任务也不会再运行。

你可以通过某种事务日志和一个定期运行的“保姆”任务来解决这些问题,这个“保姆”任务的工作是找到那些意外失败的重复任务并让它们复活。

如果我要实现你所描述的功能,我想我会这样来处理。

撰写回答