App Engine上的后台任务
我该如何在App Engine上运行后台任务呢?
8 个回答
你可以在这里了解更多关于Python App Engine中的定时任务(cron jobs)的内容:这里。
GAE(Google App Engine)是一个非常有用的工具,可以用来构建可扩展的网页应用。不过,很多人提到的一些限制包括:不支持后台任务、缺少定时任务,以及对每个HTTP请求的处理时间有严格限制。如果一个请求超出了这个时间限制,操作就会被终止,这使得运行耗时的任务变得不可能。
如何运行后台任务?
在GAE中,代码只有在有HTTP请求时才会执行。而且代码的执行时间有严格的限制(我记得是10秒)。所以如果没有请求,代码就不会被执行。一个建议的解决办法是使用外部服务器不断发送请求,这样就可以算作是创建了一个后台任务。但这样我们就需要依赖一个外部的服务器。另一个替代方案是发送302重定向响应,让客户端重新发送请求,这也让我们依赖于外部元素,也就是客户端。那么,如果这个外部服务器就是GAE本身呢?使用过不支持循环结构的函数式编程语言的人都知道,递归可以替代循环。那么,如果我们完成一部分计算后,再对同一个网址进行HTTP GET请求,设置一个很短的超时时间,比如1秒,这样就会在运行的PHP代码中形成一个循环(递归)。
<?php $i = 0; if(isset($_REQUEST["i"])){ $i= $_REQUEST["i"]; sleep(1); } $ch = curl_init("http://localhost".$_SERVER["PHP_SELF"]."?i=".($i+1)); curl_setopt($ch, CURLOPT_HEADER, 0); curl_setopt($ch, CURLOPT_TIMEOUT, 1); curl_exec($ch); print "hello world\n"; ?>
但在GAE上,这似乎不太管用。那么,如果我们对另一个网址,比如url2进行HTTP GET请求,而这个url2又对第一个网址进行HTTP GET请求呢?这在GAE上似乎是可行的。代码大致是这样的。
class FirstUrl(webapp.RequestHandler): def get(self): self.response.out.write("ok") time.sleep(2) urlfetch.fetch("http://"+self.request.headers["HOST"]+'/url2') class SecondUrl(webapp.RequestHandler): def get(self): self.response.out.write("ok") time.sleep(2) urlfetch.fetch("http://"+self.request.headers["HOST"]+'/url1') application = webapp.WSGIApplication([('/url1', FirstUrl), ('/url2', SecondUrl)]) def main(): run_wsgi_app(application) if __name__ == "__main__": main()
既然我们找到了运行后台任务的方法,那我们就来构建定时任务(timer)和跨多个HTTP请求的循环结构(foreach)。
定时器
构建定时器其实很简单。基本的想法是维护一个定时器列表,以及每个定时器应该被调用的时间间隔。一旦达到这个时间间隔,就调用回调函数。我们会使用memcache来保存定时器列表。为了找出何时调用回调,我们会在memcache中存储一个键,设置过期时间为时间间隔。我们会定期(比如每5秒)检查这个键是否存在,如果不存在,就调用回调函数,然后再次设置这个键的时间间隔。
def timer(func, interval): timerlist = memcache.get('timer') if(None == timerlist): timerlist = [] timerlist.append({'func':func, 'interval':interval}) memcache.set('timer-'+func, '1', interval) memcache.set('timer', timerlist) def checktimers(): timerlist = memcache.get('timer') if(None == timerlist): return False for current in timerlist: if(None == memcache.get('timer-'+current['func'])): #reset interval memcache.set('timer-'+current['func'], '1', current['interval']) #invoke callback function try: eval(current['func']+'()') except: pass return True return False
Foreach
当我们需要进行耗时的计算,比如对1000条数据库记录进行操作或获取1000个网址等,就需要这个功能。基本的想法是将回调函数和参数保存在memcache中,每次调用回调时传入参数。
def foreach(func, args): looplist = memcache.get('foreach') if(None == looplist): looplist = [] looplist.append({'func':func, 'args':args}) memcache.set('foreach', looplist) def checkloops(): looplist = memcache.get('foreach') if(None == looplist): return False if((len(looplist) > 0) and (len(looplist[0]['args']) > 0)): arg = looplist[0]['args'].pop(0) func = looplist[0]['func'] if(len(looplist[0]['args']) == 0): looplist.pop(0) if((len(looplist) > 0) and (len(looplist[0]['args']) > 0)): memcache.set('foreach', looplist) else: memcache.delete('foreach') try: eval(func+'('+repr(arg)+')') except: pass return True else: return False # instead of # foreach index in range(0, 1000): # someoperaton(index) # we will say # foreach('someoperaton', range(0, 1000))
现在,构建一个每小时获取网址列表的程序是很简单的。代码如下。
def getone(url): try: result = urlfetch.fetch(url) if(result.status_code == 200): memcache.set(url, '1', 60*60) #process result.content except : pass def getallurl(): #list of urls to be fetched urllist = ['http://www.google.com/', 'http://www.cnn.com/', 'http://www.yahoo.com', 'http://news.google.com'] fetchlist = [] for url in urllist: if (memcache.get(url) is None): fetchlist.append(url) #this is equivalent to #for url in fetchlist: getone(url) if(len(fetchlist) > 0): foreach('getone', fetchlist) #register the timer callback timer('getallurl', 3*60)
完整代码在这里 http://groups.google.com/group/httpmr-discuss/t/1648611a54c01aa。我在appengine上运行这段代码已经几天了,没遇到什么问题。
警告:我们大量使用urlfetch。每天的urlfetch数量限制是160000。所以要小心不要达到这个限制。
你可以使用 任务队列的Python接口。