App Engine 的 cron 任务和任务队列出现 DeadlineExceededError,适用于维基百科爬虫
我正在尝试在谷歌应用引擎上构建一个维基百科链接爬虫。我想把一个索引存储在数据存储中。但是我遇到了DeadlineExceededError这个错误,既在定时任务中,也在任务队列中。
对于定时任务,我有以下代码:
def buildTree(self):
start=time.time()
self.log.info(" Start Time: %f" % start)
nobranches=TreeNode.all()
for tree in nobranches:
if tree.branches==[]:
self.addBranches(tree)
time.sleep(1)
if (time.time()-start) > 10 :
break
self.log.info("Time Eclipsed: %f" % (time.time()-start))
self.log.info(" End Time:%f" % time.clock())
我不明白为什么这个循环在10秒后不停止。在开发服务器上是可以的。服务器上的time.time()可能有问题。我可以用其他函数吗?
对于任务队列,我有以下代码:
def addNewBranch(self, keyword, level=0):
self.log.debug("Add Tree")
self.addBranches(keyword)
t=TreeNode.gql("WHERE name=:1", keyword).get()
branches=t.nodes
if level < 3:
for branch in branches:
if branch.branches == []:
taskqueue.add(url="/addTree/%s" % branch.name)
self.log.debug("url:%s" % "/addTree/%s" % branch.name)
日志显示它们都遇到了DeadlineExceededError。难道后台处理的时间不应该比页面请求的30秒更长吗?有没有办法绕过这个异常?
这是addBranch()的代码:
def addBranches(self, keyword):
tree=TreeNode.gql("WHERE name=:1", keyword).get()
if tree is None:
tree=TreeNode(name=keyword)
self.log.debug("in addBranches arguments: tree %s", tree.name)
t=urllib2.quote(tree.name.encode('utf8'))
s="http://en.wikipedia.org/w/api.php?action=query&titles=%s&prop=links&pllimit=500&format=xml" % t
self.log.debug(s)
try:
usock = urllib2.urlopen(s)
except :
self.log.error( "Could not retrieve doc: %s" % tree.name)
usock=None
if usock is not None:
try:
xmldoc=minidom.parse(usock)
except Exception , error:
self.log.error("Parse Error: %s" % error)
return None
usock.close()
try:
pyNode= xmldoc.getElementsByTagName('pl')
self.log.debug("Nodes to be added: %d" % pyNode.length)
except Exception, e:
pyNode=None
self.log.error("Getting Nodes Error: %s" % e)
return None
newNodes=[]
if pyNode is not None:
for child in pyNode:
node=None
node= TreeNode.gql("WHERE name=:1", child.attributes["title"].value).get()
if node is None:
newNodes.append(TreeNode(name=child.attributes["title"].value))
else:
tree.branches.append(node.key())
db.put(newNodes)
for node in newNodes:
tree.branches.append(node.key())
self.log.debug("Node Added: %s" % node.name)
tree.put()
return tree.branches
4 个回答
当出现DeadlineExcededErrors错误时,你希望如果再次请求,能够最终成功。这可能需要确保你的爬虫状态已经有了一些进展,这样下次就可以跳过这些进展。(这里不详细讨论)
并行调用可以大大帮助你。
- Urlfetch
- Datastore Put(将不同的实体一起放入数据库)
- Datastore Query(并行查询 - 使用asynctools)
Urlfetch:
- 在进行urlfetch调用时,确保使用异步模式,这样可以简化你的循环。
Datastore
将多个实体合并到一次请求中。
# put newNodes+tree at the same time db.put(newNodes+tree)
将TreeNode.gql从循环内部提取到并行查询工具中,比如asynctools http://asynctools.googlecode.com
Asynctools 示例
if pyNode is not None:
runner = AsyncMultiTask()
for child in pyNode:
title = child.attributes["title"].value
query = db.GqlQuery("SELECT __key__ FROM TreeNode WHERE name = :1", title)
runner.append(QueryTask(query, limit=1, client_state=title))
# kick off the work
runner.run()
# peel out the results
treeNodes = []
for task in runner:
task_result = task.get_result() # will raise any exception that occurred for the given query
treeNodes.append(task_result)
for node in treeNodes:
if node is None:
newNodes.append(TreeNode(name=child.attributes["title"].value))
else:
tree.branches.append(node.key())
for node in newNodes:
tree.branches.append(node.key())
self.log.debug("Node Added: %s" % node.name)
# put newNodes+tree at the same time
db.put(newNodes+tree)
return tree.branches
声明:我与asynctools有关系。
这里的问题是,你对文档中的每一个链接都在进行查询操作。由于维基百科的页面可能包含很多链接,这就意味着你要进行很多次查询——这样一来,你就会耗尽处理时间。而且,这种做法还会非常快速地消耗你的配额!
相反,你应该把维基百科页面的名称当作实体的关键名称。然后,你可以把文档中的所有链接收集到一个列表里,从中构建出键(这完全是在本地进行的操作),然后一次性进行批量的 db.get 查询。等你更新和/或创建了这些数据后,可以一次性进行批量的 db.put,把它们都存储到数据存储中——这样一来,你的总数据存储操作次数就从 numlinks*2 减少到仅仅 2 次!
我在GAE上处理日期时间方面取得了很好的效果。
from datetime import datetime, timedelta
time_start = datetime.now()
time_taken = datetime.now() - time_start
time_taken会是一个时间差,你可以把它和另一个你感兴趣的时间差进行比较。
ten_seconds = timedelta(seconds=10)
if time_taken > ten_seconds:
....do something quick.
听起来你用mapreduce或者任务队列会更合适。这两种方法都很适合处理大量记录。
你现在的代码可以更简洁一些,只获取部分记录。
nobranches=TreeNode.all().fetch(100)
这段代码只会拉取100条记录。如果你有满满的100条,处理完后可以再往队列里放一个任务,继续拉取更多。
-- 基于关于需要没有分支的树的评论 --
我没有看到你的模型,但如果我想创建一个没有分支的树的列表并处理它们,我会:每次只获取大约100棵树的键。然后,使用一个In查询获取这些树的所有分支。按树的键排序。扫描分支列表,第一次找到某棵树的键时,从列表中提取出这棵树的键。完成后,你就会有一个“没有分支”的树的键的列表。安排每一棵树进行处理。
更简单的做法是对树使用MapReduce。对于每棵树,找到一个与其ID匹配的分支。如果找不到,就标记这棵树以便后续处理。默认情况下,这个功能会批量拉取树(我记得是25棵),并且有8个同时工作的任务。它会自动管理任务队列,所以你不用担心超时的问题。