Google App Engine:如何使用任务队列进行处理?

3 投票
2 回答
1988 浏览
提问于 2025-04-16 01:08

我正在使用Python的GAE SDK。

我需要处理6000多个MyKind实例。一次请求处理太慢,所以我决定使用任务队列。如果我让每个任务只处理一个实体,那应该只需要几秒钟。

文档上说一次“批量”最多只能添加100个任务。(这里的“批量”是什么意思?是在一次请求中?还是在一个任务中?)

假设“批量”指的是“请求”,我在想怎么才能为数据存储中的每个实体创建一个任务。你觉得怎么样?

如果我可以假设MyKind的顺序永远不会改变,那就简单多了。(处理过程不会改变MyKind实例,只会创建其他类型的新实例。)我可以创建一堆任务,每个任务指定一个起始位置,间隔少于100。然后,每个任务可以创建实际处理的单独任务。

但如果实体太多,原始请求无法添加所有必要的调度任务呢?这让我觉得需要一个递归的解决方案——每个任务查看它所给定的范围。如果范围内只有一个元素,它就处理这个元素。否则,它会进一步细分这个范围,创建后续任务。

如果我不能依靠使用偏移量和限制来识别实体(因为它们的顺序不一定是固定的),那我是不是可以直接用它们的键呢?但这样的话,我可能会传递成千上万的键,这样似乎不太方便。

我这样做的方向对吗,还是说我应该考虑其他设计方案?

2 个回答

0

根据你的设计,你可以像我一样给所有需要处理的记录编号。我处理大约3500个项目,每个项目大约需要3秒钟。为了避免重叠、超时,并考虑到未来的扩展,我的第一个任务是从数据库中获取所有独特项目的列表。然后,我把这些项目分成每组500个的列表,循环处理,直到所有独特项目都被计算在内,并把每组500个标识符发送到第二层处理任务。

每个第二层处理任务,目前大约有七八个不同的任务,都会有一个独特的500个项目的列表,每个处理任务会为每个独特的标识符添加500个任务。

因为这一切都是通过循环和根据数据库中独特项目的数量来管理的,所以我可以随意添加独特项目,任务的数量会自动增加,绝对不会重复。我每天用这个来跟踪游戏中的价格,所以它是通过定时任务自动运行的,我根本不需要干预。

9

当你运行像 taskqueue.add(url='/worker', params={'cursor': cursor}) 这样的代码时,你是在排队一个任务;也就是说,你在安排一个请求,让它在后台执行,并使用你提供的参数。看起来你一次可以安排多达100个这样的任务。

不过,我觉得你可能不想这样做。使用任务链会让事情简单很多:

你的工作任务可以这样做:

  • 运行一个查询来获取一些记录进行处理。如果在任务参数中提供了游标,就用它。限制查询结果为10条记录,或者你认为能在30秒内完成的数量。

  • 处理这10条记录。

  • 如果你的查询返回了10条记录,排队另一个任务,并把查询中更新的游标传给它,这样它就可以接着你上次的进度继续。

  • 如果你得到的记录少于10条,那就完成了。太好了!发个邮件或者做点别的事情,然后结束。

通过这种方式,你只需要启动第一个任务,后面的任务会自动添加。

需要注意的是,如果一个任务失败了,App Engine会一直重试,直到它成功,所以你不用担心数据存储的问题导致某个任务超时而打断任务链。

补充:

以上步骤并不能保证每个实体只会被处理一次。任务一般应该只运行一次,但谷歌建议你设计时要考虑到幂等性。如果这是一个主要问题,这里有一种处理方式:

  • 在每个要处理的实体上加一个状态标志,或者创建一个补充实体来保存这个标志。它应该有类似“待处理”、“处理中”和“已处理”的状态。

  • 当你获取一个新的实体进行处理时,进行事务锁定并增加处理标志。只有当状态是“待处理”时才运行这个实体。处理完成后,再次增加标志。

注意,在你开始之前,并不一定要在每个实体上添加处理标志。你的“待处理”状态可以仅仅意味着这个属性或对应的实体还不存在。

撰写回答