任务管理守护进程
我需要处理一些需要很长时间(我想大概2-3天)的任务,这些任务涉及到我的Django ORM数据。我四处寻找,但没有找到什么好的解决方案。
django-tasks - http://code.google.com/p/django-tasks/ 的文档不太完善,我也不知道该怎么使用它。
celery - http://ask.github.com/celery/ 对于我的任务来说有点过于复杂了。它适合处理长时间的任务吗?
所以,我需要做的就是从我的数据库中获取所有数据或部分数据,比如:
Entry.objects.all()
然后我需要对每一个QuerySet执行相同的函数。
我想这个过程应该会持续大约2-3天。
所以,也许有人能给我解释一下该怎么构建这个。
附注:目前我只有一个想法,就是使用cron和数据库来存储处理执行的时间线。
1 个回答
1
使用Celery的子任务。这可以让你启动一个需要很长时间才能完成的任务(下面有很多短时间完成的子任务),并且可以在Celery的任务结果存储中很好地记录它的执行状态。额外的好处是,子任务会分散到多个工作进程中,这样你就可以充分利用多核服务器,甚至是多个服务器,从而减少任务的运行时间。
- http://ask.github.com/celery/userguide/tasksets.html#task-sets
- http://docs.celeryproject.org/en/latest/reference/celery.task.sets.html
编辑:举个例子:
import time, logging as log
from celery.task import task
from celery.task.sets import TaskSet
from app import Entry
@task(send_error_emails=True)
def long_running_analysis():
entries = list(Entry.objects.all().values('id'))
num_entries = len(entries)
taskset = TaskSet(analyse_entry.subtask(entry.id) for entry in entries)
results = taskset.apply_async()
while not results.ready()
time.sleep(10000)
print log.info("long_running_analysis is %d% complete",
completed_count()*100/num_entries)
if results.failed():
log.error("Analysis Failed!")
result_set = results.join() # brings back results in
# the order of entries
#perform collating or count or percentage calculations here
log.error("Analysis Complete!")
@task
def analyse_entry(id): # inputs must be serialisable
logger = analyse_entry.get_logger()
entry = Entry.objects.get(id=id)
try:
analysis = entry.analyse()
logger.info("'%s' found to be %s.", entry, analysis['status'])
return analysis # must be a dict or serialisable.
except Exception as e:
logger.error("Could not process '%s': %s", entry, e)
return None
如果你的计算不能分成每个条目的任务,你可以设置一个子任务来进行统计,另一个子任务来进行其他类型的分析。这样仍然可以正常工作,并且仍然能让你享受到并行处理的好处。