在Google App Engine中使用MapReduce的简单反例

9 投票
2 回答
3910 浏览
提问于 2025-04-16 17:57

我对GAE(谷歌应用引擎)中mapreduce的支持情况有点困惑。根据文档,http://code.google.com/p/appengine-mapreduce/,目前还不支持reduce阶段,但在2011年I/O会议的描述中(http://www.youtube.com/watch?v=EIxelKcyCC0)却写着“现在可以在App Engine上运行完整的Map Reduce作业”。我想知道我是否可以在这个任务中使用mapreduce:

我想做的事情:

我有一个名为Car的模型,里面有一个颜色字段:

class Car(db.Model):
    color = db.StringProperty()

我想定期(通过cron定义)运行mapreduce过程,计算每种颜色的汽车数量,并将这个结果存储在数据存储中。这个任务似乎非常适合用mapreduce来完成(如果我错了请纠正我),在“map”阶段会为每个Car实体生成一对(颜色, 1),而“reduce”阶段应该根据颜色名称合并这些数据,给我想要的结果。最终我想得到的是存储在数据存储中的计算结果实体,大概是这样的:

class CarsByColor(db.Model):
    color_name = db.StringProperty()
    cars_num = db.IntegerProperty()

问题:

我不知道如何在appengine中实现这个... 视频中展示了定义map和reduce函数的例子,但它们似乎是一些非常通用的例子,并没有涉及到数据存储。我找到的其他例子都是用一个函数来处理来自DatastoreInputReader的数据,但它们似乎只涉及“map”阶段,没有关于如何进行“reduce”(以及如何将reduce结果存储到数据存储中的例子)。

2 个回答

9

其实你并不一定需要一个“归约阶段”。你可以通过一系列线性的任务来完成这个工作,基本上可以这样做:

def count_colors(limit=100, totals={}, cursor=None):
  query = Car.all()
  if cursor:
    query.with_cursor(cursor)
  cars = query.fetch(limit)
  for car in cars:
    try:
      totals[car.color] += 1
    except KeyError:
      totals[car.color] = 1
  if len(cars) == limit:
    cursor = query.cursor()
    return deferred.defer(count_colors, limit, totals, cursor)
  entities = []
  for color in totals:
    entity = CarsByColor(key_name=color)
    entity.cars_num = totals[color]
    entities.append(entity)
  db.put(entities)

deferred.defer(count_colors)

这个过程会遍历你所有的汽车,把一个查询游标和一个正在进行的总数传递给一系列临时任务,最后把总数存储起来。

如果你需要合并来自多个数据存储、多个模型或在单一模型中多个索引的数据,那么“归约阶段”可能会有意义。不过就目前的情况来看,我觉得这样做并不会给你带来什么好处。

还有一个选择:使用任务队列来维护每种颜色的实时计数。当你创建一辆车时,启动一个任务来增加该颜色的总数。当你更新一辆车时,启动一个任务来减少旧颜色的计数,同时再启动一个任务来增加新颜色的计数。为了避免竞争条件,更新计数时要确保事务性。

6

我在这里分享一个我最终找到的解决方案,使用的是GAE中的mapreduce(没有reduce阶段)。如果我从头开始,我可能会选择Drew Sears提供的方案。

这个方案在GAE python 1.5.0中可以运行。

app.yaml文件中,我添加了mapreduce的处理程序:

- url: /mapreduce(/.*)?
  script: $PYTHON_LIB/google/appengine/ext/mapreduce/main.py

还有我自己代码的mapreduce处理程序(我使用的url是/mapred_update,用来收集mapreduce产生的结果):

- url: /mapred_.*
  script: mapred.py

我创建了mapreduce.yaml来处理汽车实体:

mapreduce:
- name: Color_Counter
  params:
  - name: done_callback
    value: /mapred_update
  mapper:
    input_reader: google.appengine.ext.mapreduce.input_readers.DatastoreInputReader
    handler: mapred.process
    params:
    - name: entity_kind
      default: models.Car

解释一下:done_callback是一个在mapreduce完成操作后被调用的url。mapred.process是一个处理单个实体并更新计数的函数(它在mapred.py文件中定义)。Car模型是在models.py中定义的。

mapred.py

from models import CarsByColor
from google.appengine.ext import db
from google.appengine.ext.mapreduce import operation as op
from google.appengine.ext.mapreduce.model import MapreduceState

from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app

def process(entity):
    """Process individual Car"""
    color = entity.color
    if color:
        yield op.counters.Increment('car_color_%s' % color)

class UpdateCounters(webapp.RequestHandler):
    """Create stats models CarsByColor based on the data 
    gathered by mapreduce counters"""
    def post(self):
        """Called after mapreduce operation are finished"""
        # Finished mapreduce job id is passed in request headers
        job_id = self.request.headers['Mapreduce-Id']
        state = MapreduceState.get_by_job_id(job_id)
        to_put = []
        counters = state.counters_map.counters
        # Remove counter not needed for stats
        del counters['mapper_calls']
        for counter in counters.keys():
            stat = CarsByColor.get_by_key_name(counter)
            if not stat:
                stat = CarsByColor(key_name=counter,
                                name=counter)
            stat.value = counters[counter]
            to_put.append(stat)
        db.put(to_put)

        self.response.headers['Content-Type'] = 'text/plain'
        self.response.out.write('Updated.')


application = webapp.WSGIApplication(
                                     [('/mapred_update', UpdateCounters)],
                                     debug=True)
def main():
    run_wsgi_app(application)

if __name__ == "__main__":
    main()            

与问题中的定义相比,CarsByColor模型有些许变化。

你可以通过这个url手动启动mapreduce任务:http://yourapp/mapreduce/,希望也可以通过cron来启动(我还没有测试过cron)。

撰写回答