django,fastcgi:如何管理长时间运行的进程?
我接手了一个使用django和fastcgi的应用程序,需要对它进行修改,以便执行一个可能需要半小时或更长时间的复杂计算。我想做的是在后台运行这个计算,并返回一个“你的任务已经开始”的响应。在计算进行期间,如果有人再次访问这个网址,应该返回“你的任务仍在运行”,直到计算完成,这时再返回计算结果。之后如果再访问这个网址,就应该返回之前缓存的结果。
我对django完全是个新手,十年来没有做过什么重要的网页工作,所以我不知道有没有现成的方法可以实现我想要的功能。我尝试过用subprocess.Popen()来启动这个过程,这样做是可以的,但会在进程表中留下一个无效的条目。我需要一个干净的解决方案,能够在计算完成后清理临时文件和任何进程的痕迹。
我还尝试过使用fork()和线程,但还没有找到一个可行的解决方案。对于我来说,这似乎是一个相当常见的用例,是否有一个标准的解决方案?顺便说一下,这个应用只会在一个内部服务器上使用,流量非常低。
2 个回答
也许你可以换个角度来看这个问题。
你可以试试 DjangoQueueService,然后让一个“守护进程”去监听这个队列,看看有没有新的东西进来并处理它。
我现在也要解决一个类似的问题。这个网站不会是公开的,而是一个内部服务器,流量比较少。
技术限制:
- 所有输入的数据都可以在长时间运行的过程中开始时提供。
- 这个长时间运行的过程不需要用户交互(除了开始时的初始输入)。
- 计算的时间比较长,结果不能立即通过HTTP响应返回给客户端。
- 需要某种反馈(比如进度条)来显示长时间运行过程的状态。
因此,我们至少需要两个网页“视图”:一个用来启动长时间运行的过程,另一个用来监控它的状态或收集结果。
我们还需要某种进程间通信:将用户数据从启动者(即处理HTTP请求的网页服务器)发送到长时间运行的过程,然后再将结果发送给接收者(同样是网页服务器,通过HTTP请求驱动)。前者比较简单,后者就不太明显了。与普通的Unix编程不同,接收者一开始并不知道。接收者可能是一个与启动者不同的进程,而且它可能在长时间运行的任务还在进行中或已经完成时才启动。所以管道不太适用,我们需要保存长时间运行过程的结果。
我看到两种可能的解决方案:
- 将长时间运行的进程的启动任务交给长时间作业管理器(这可能就是上面提到的django-queue-service);
- 将结果永久保存,或者保存在文件中,或者保存在数据库里。
我更倾向于使用临时文件,并在会话数据中记住它们的位置。我觉得这样已经很简单了。
一个作业脚本(就是长时间运行的过程),myjob.py
:
import sys
from time import sleep
i = 0
while i < 1000:
print 'myjob:', i
i=i+1
sleep(0.1)
sys.stdout.flush()
django urls.py
映射:
urlpatterns = patterns('',
(r'^startjob/$', 'mysite.myapp.views.startjob'),
(r'^showjob/$', 'mysite.myapp.views.showjob'),
(r'^rmjob/$', 'mysite.myapp.views.rmjob'),
)
django 视图:
from tempfile import mkstemp
from os import fdopen,unlink,kill
from subprocess import Popen
import signal
def startjob(request):
"""Start a new long running process unless already started."""
if not request.session.has_key('job'):
# create a temporary file to save the resuls
outfd,outname=mkstemp()
request.session['jobfile']=outname
outfile=fdopen(outfd,'a+')
proc=Popen("python myjob.py",shell=True,stdout=outfile)
# remember pid to terminate the job later
request.session['job']=proc.pid
return HttpResponse('A <a href="/showjob/">new job</a> has started.')
def showjob(request):
"""Show the last result of the running job."""
if not request.session.has_key('job'):
return HttpResponse('Not running a job.'+\
'<a href="/startjob/">Start a new one?</a>')
else:
filename=request.session['jobfile']
results=open(filename)
lines=results.readlines()
try:
return HttpResponse(lines[-1]+\
'<p><a href="/rmjob/">Terminate?</a>')
except:
return HttpResponse('No results yet.'+\
'<p><a href="/rmjob/">Terminate?</a>')
return response
def rmjob(request):
"""Terminate the runining job."""
if request.session.has_key('job'):
job=request.session['job']
filename=request.session['jobfile']
try:
kill(job,signal.SIGKILL) # unix only
unlink(filename)
except OSError, e:
pass # probably the job has finished already
del request.session['job']
del request.session['jobfile']
return HttpResponseRedirect('/startjob/') # start a new one