App Engine 的 cron 任务和任务队列出现 DeadlineExceededError,适用于维基百科爬虫

2 投票
4 回答
1210 浏览
提问于 2025-04-16 05:23

我正在尝试在谷歌应用引擎上构建一个维基百科链接爬虫。我想把一个索引存储在数据存储中。但是我遇到了DeadlineExceededError这个错误,既在定时任务中,也在任务队列中。

对于定时任务,我有以下代码:

def buildTree(self):

    start=time.time()
    self.log.info(" Start Time: %f" % start)
    nobranches=TreeNode.all()       

    for tree in nobranches:            
        if tree.branches==[]:
            self.addBranches(tree)
            time.sleep(1)
        if (time.time()-start) > 10 :                
            break
        self.log.info("Time Eclipsed: %f" % (time.time()-start))

    self.log.info(" End Time:%f" % time.clock())

我不明白为什么这个循环在10秒后不停止。在开发服务器上是可以的。服务器上的time.time()可能有问题。我可以用其他函数吗?

对于任务队列,我有以下代码:
def addNewBranch(self, keyword, level=0):

    self.log.debug("Add Tree")        
    self.addBranches(keyword)

    t=TreeNode.gql("WHERE name=:1", keyword).get()
    branches=t.nodes

    if level < 3:
        for branch in branches:
            if branch.branches == []:
                taskqueue.add(url="/addTree/%s" % branch.name)
                self.log.debug("url:%s" % "/addTree/%s" % branch.name)

日志显示它们都遇到了DeadlineExceededError。难道后台处理的时间不应该比页面请求的30秒更长吗?有没有办法绕过这个异常?

这是addBranch()的代码:

def addBranches(self, keyword):

    tree=TreeNode.gql("WHERE name=:1", keyword).get()
    if tree is None:
        tree=TreeNode(name=keyword)


    self.log.debug("in addBranches arguments: tree %s", tree.name)     
    t=urllib2.quote(tree.name.encode('utf8'))
    s="http://en.wikipedia.org/w/api.php?action=query&titles=%s&prop=links&pllimit=500&format=xml" % t
    self.log.debug(s)
    try:        
        usock = urllib2.urlopen(s)       
    except :        

        self.log.error( "Could not retrieve doc: %s" % tree.name)
        usock=None

    if usock is not None:

        try:
            xmldoc=minidom.parse(usock)
        except Exception , error:
            self.log.error("Parse Error: %s" % error) 
            return None   
        usock.close()            
        try:
            pyNode= xmldoc.getElementsByTagName('pl')  
            self.log.debug("Nodes to be added: %d" % pyNode.length)
        except Exception, e:
            pyNode=None
            self.log.error("Getting Nodes Error: %s" % e)
            return None
        newNodes=[]    
        if pyNode is not None:
            for child in pyNode: 
                node=None             
                node= TreeNode.gql("WHERE name=:1", child.attributes["title"].value).get()

                if node is None:
                    newNodes.append(TreeNode(name=child.attributes["title"].value))           

                else:
                    tree.branches.append(node.key())  
            db.put(newNodes)
            for node in newNodes:
                tree.branches.append(node.key())
                self.log.debug("Node Added: %s" % node.name)                    
            tree.put()
            return tree.branches 

4 个回答

1

当出现DeadlineExcededErrors错误时,你希望如果再次请求,能够最终成功。这可能需要确保你的爬虫状态已经有了一些进展,这样下次就可以跳过这些进展。(这里不详细讨论)

并行调用可以大大帮助你。

  • Urlfetch
  • Datastore Put(将不同的实体一起放入数据库)
  • Datastore Query(并行查询 - 使用asynctools)

Urlfetch:

  • 在进行urlfetch调用时,确保使用异步模式,这样可以简化你的循环。

Datastore

  • 将多个实体合并到一次请求中。

    # put newNodes+tree at the same time
    db.put(newNodes+tree)
    
  • 将TreeNode.gql从循环内部提取到并行查询工具中,比如asynctools http://asynctools.googlecode.com

Asynctools 示例

    if pyNode is not None:

        runner = AsyncMultiTask()
        for child in pyNode:
             title = child.attributes["title"].value
             query = db.GqlQuery("SELECT __key__ FROM TreeNode WHERE name = :1", title)
             runner.append(QueryTask(query, limit=1, client_state=title))

        # kick off the work
        runner.run()

        # peel out the results
        treeNodes = []
        for task in runner:
            task_result = task.get_result() # will raise any exception that occurred for the given query
            treeNodes.append(task_result)

        for node in treeNodes:
            if node is None:
                newNodes.append(TreeNode(name=child.attributes["title"].value))

            else:
                tree.branches.append(node.key())
        for node in newNodes:
            tree.branches.append(node.key())
            self.log.debug("Node Added: %s" % node.name)

        # put newNodes+tree at the same time
        db.put(newNodes+tree)
        return tree.branches

声明:我与asynctools有关系。

1

这里的问题是,你对文档中的每一个链接都在进行查询操作。由于维基百科的页面可能包含很多链接,这就意味着你要进行很多次查询——这样一来,你就会耗尽处理时间。而且,这种做法还会非常快速地消耗你的配额!

相反,你应该把维基百科页面的名称当作实体的关键名称。然后,你可以把文档中的所有链接收集到一个列表里,从中构建出键(这完全是在本地进行的操作),然后一次性进行批量的 db.get 查询。等你更新和/或创建了这些数据后,可以一次性进行批量的 db.put,把它们都存储到数据存储中——这样一来,你的总数据存储操作次数就从 numlinks*2 减少到仅仅 2 次!

2

我在GAE上处理日期时间方面取得了很好的效果。

from datetime import datetime, timedelta
time_start = datetime.now()
time_taken = datetime.now() - time_start

time_taken会是一个时间差,你可以把它和另一个你感兴趣的时间差进行比较。

ten_seconds = timedelta(seconds=10)
if time_taken > ten_seconds:
    ....do something quick.

听起来你用mapreduce或者任务队列会更合适。这两种方法都很适合处理大量记录。

你现在的代码可以更简洁一些,只获取部分记录。

nobranches=TreeNode.all().fetch(100)

这段代码只会拉取100条记录。如果你有满满的100条,处理完后可以再往队列里放一个任务,继续拉取更多。

-- 基于关于需要没有分支的树的评论 --

我没有看到你的模型,但如果我想创建一个没有分支的树的列表并处理它们,我会:每次只获取大约100棵树的键。然后,使用一个In查询获取这些树的所有分支。按树的键排序。扫描分支列表,第一次找到某棵树的键时,从列表中提取出这棵树的键。完成后,你就会有一个“没有分支”的树的键的列表。安排每一棵树进行处理。

更简单的做法是对树使用MapReduce。对于每棵树,找到一个与其ID匹配的分支。如果找不到,就标记这棵树以便后续处理。默认情况下,这个功能会批量拉取树(我记得是25棵),并且有8个同时工作的任务。它会自动管理任务队列,所以你不用担心超时的问题。

撰写回答