在App Engine上任务队列为空时运行函数

5 投票
2 回答
807 浏览
提问于 2025-04-16 15:05

我每天都有一个定时任务,去调用一个API并获取一些数据。对于每一行数据,我会启动一个任务队列来处理这些数据(这涉及到通过其他API查找数据)。一旦这些处理完成,我的数据在接下来的24小时内不会改变,所以我会把它存到缓存里。

有没有办法知道我排队的所有任务什么时候都完成了,这样我就可以缓存数据呢?

目前我用的方式有点乱,我只是安排了两个定时任务,像这样:

class fetchdata(webapp.RequestHandler):
def get(self):
    todaykey = str(date.today())
    memcache.delete(todaykey)
    topsyurl = 'http://otter.topsy.com/search.json?q=site:open.spotify.com/album&window=d&perpage=20'
    f = urllib.urlopen(topsyurl)
    response = f.read()
    f.close()

    d = simplejson.loads(response)
    albums = d['response']['list']
    for album in albums:
        taskqueue.add(url='/spotifyapi/', params={'url':album['url'], 'score':album['score']})

class flushcache(webapp.RequestHandler):
    def get(self):
        todaykey = str(date.today())
        memcache.delete(todaykey)   

然后我的cron.yaml文件看起来是这样的:

- description: gettopsy
  url: /fetchdata/
  schedule: every day 01:00
  timezone: Europe/London

- description: flushcache
  url: /flushcache/
  schedule: every day 01:05
  timezone: Europe/London

简单来说,我是在猜测所有的任务不会超过5分钟来完成,所以我在5分钟后清空缓存,这样可以确保当数据被缓存时,它是完整的。

有没有更好的方法来编写这个代码?感觉我的解决方案不是最好的……

谢谢,
汤姆

2 个回答

2

我在处理同样的问题时发现了这个问题。我想出了一个不同的解决方案,发在这里希望对其他人有用。

这不是你问的直接替代方案,但它是相关的——我的问题是我想知道队列什么时候是空的,因为这意味着一个复杂的后台进程已经完成了。所以我可以用检查一个“死人计时器”来替代检查队列的大小。

死人计时器是一个由某个进程不断重置的计时器。当那个进程结束时,计时器就不会被重置,最终会到期。所以我让所有组成我复杂后台进程的不同任务来重置这个计时器,而不是检查队列是否为空,我设置了一个定时任务去检查计时器是否到期。

当然,为了提高效率,计时器必须避免频繁写入数据存储。这里的代码通过稍微放宽行为,使用内存缓存来保存计时器对象的副本,并且只有在经过一段时间后才在存储中重置它,避免了频繁写入。

顺便说一下,这种方法比你描述的更有效,因为它不需要频繁写入数据库。而且它也更稳健,因为你不需要精确跟踪发生了什么。

6

现在没有办法直接知道你的任务什么时候执行完毕。最好的办法是,在数据存储中插入一些标记记录,每个任务完成后就删除自己的记录。这样,每个任务就可以检查自己是不是最后一个任务,如果是的话,就可以进行清理或者缓存操作。

撰写回答