任务管理守护进程

2 投票
1 回答
770 浏览
提问于 2025-04-17 01:59

我需要处理一些需要很长时间(我想大概2-3天)的任务,这些任务涉及到我的Django ORM数据。我四处寻找,但没有找到什么好的解决方案。

django-tasks - http://code.google.com/p/django-tasks/ 的文档不太完善,我也不知道该怎么使用它。

celery - http://ask.github.com/celery/ 对于我的任务来说有点过于复杂了。它适合处理长时间的任务吗?

所以,我需要做的就是从我的数据库中获取所有数据或部分数据,比如:

Entry.objects.all()

然后我需要对每一个QuerySet执行相同的函数。

我想这个过程应该会持续大约2-3天。

所以,也许有人能给我解释一下该怎么构建这个。

附注:目前我只有一个想法,就是使用cron和数据库来存储处理执行的时间线。

1 个回答

1

使用Celery的子任务。这可以让你启动一个需要很长时间才能完成的任务(下面有很多短时间完成的子任务),并且可以在Celery的任务结果存储中很好地记录它的执行状态。额外的好处是,子任务会分散到多个工作进程中,这样你就可以充分利用多核服务器,甚至是多个服务器,从而减少任务的运行时间。

编辑:举个例子:

import time, logging as log
from celery.task import task
from celery.task.sets import TaskSet
from app import Entry

@task(send_error_emails=True)
def long_running_analysis():
    entries = list(Entry.objects.all().values('id'))
    num_entries = len(entries)
    taskset = TaskSet(analyse_entry.subtask(entry.id) for entry in entries)
    results = taskset.apply_async()
    while not results.ready()
        time.sleep(10000)
        print log.info("long_running_analysis is %d% complete",
                       completed_count()*100/num_entries)
    if results.failed():
        log.error("Analysis Failed!")
    result_set = results.join() # brings back results in 
                                # the order of entries
    #perform collating or count or percentage calculations here
    log.error("Analysis Complete!")

@task
def analyse_entry(id): # inputs must be serialisable
    logger = analyse_entry.get_logger()
    entry = Entry.objects.get(id=id)
    try:
        analysis = entry.analyse()
        logger.info("'%s' found to be %s.", entry, analysis['status'])
        return analysis # must be a dict or serialisable.
    except Exception as e:
        logger.error("Could not process '%s': %s", entry, e)
        return None 

如果你的计算不能分成每个条目的任务,你可以设置一个子任务来进行统计,另一个子任务来进行其他类型的分析。这样仍然可以正常工作,并且仍然能让你享受到并行处理的好处。

撰写回答