在Google App Engine上对多个数据存储类型进行MapReduce

6 投票
2 回答
1122 浏览
提问于 2025-04-16 04:26

我刚刚看了2010年Google I/O的App Engine批量数据处理会议,还读了一些Google研究的MapReduce文章,现在我在考虑用Google App Engine上的MapReduce来实现一个推荐系统,使用Python编程。

我更喜欢使用appengine-mapreduce,而不是任务队列API,因为前者可以轻松遍历所有实例,自动进行批处理,自动链接任务等等。不过,我遇到一个问题:我的推荐系统需要计算两个不同模型实例之间的相关性,也就是说要处理两种不同类型的数据。

举个例子: 我有这两个模型:用户(User)和物品(Item)。每个模型都有一个标签列表作为属性。下面是计算用户和物品之间相关性的函数。需要注意的是,calculateCorrelation函数应该对每一对用户和物品组合调用:

def calculateCorrelation(user, item):
    return calculateCorrelationAverage(u.tags, i.tags)

def calculateCorrelationAverage(tags1, tags2):
    correlationSum = 0.0
    for (tag1, tag2) in allCombinations(tags1, tags2):
        correlationSum += correlation(tag1, tag2)
    return correlationSum / (len(tags1) + len(tags2))

def allCombinations(list1, list2):
    combinations = []
    for x in list1:
        for y in list2:
            combinations.append((x, y))
    return combinations             

但是,这个calculateCorrelation并不是appengine-mapreduce中的有效Mapper,可能这个函数甚至不符合MapReduce的计算概念。不过,我需要确认一下……如果能利用appengine-mapreduce的优势,比如自动批处理和任务链接,那就太好了。

有没有什么解决办法呢?

我应该定义自己的InputReader吗?一个新的InputReader可以读取两种不同类型的所有实例,这样能和当前的appengine-mapreduce实现兼容吗?

还是我应该尝试以下方法呢?

  • 将这两种类型的所有实体的所有键两两组合,形成一个新的模型实例(可能使用MapReduce)
  • 使用mappers遍历这个新模型的实例
  • 对于每个实例,使用其中的键获取两种不同类型的实体,并计算它们之间的相关性。

2 个回答

2

如果没有更多关于你在计算什么的细节,很难给出具体的建议。一个简单的选择是在map调用中直接获取相关的实体——在这里进行数据存储操作是没有问题的。

不过,这样会导致很多小的调用。正如你所提到的,编写一个自定义的InputReader可以让你并行获取这两组实体,这样会大大提高性能。

如果你能提供更多关于如何连接这些实体的细节,我们可能能给出更具体的建议。

3

根据Nick Johnson的建议,我写了一个自己的输入读取器。这个读取器可以从两种不同的实体中获取数据。它会生成这些实体的所有组合。下面是代码:

class TwoKindsInputReader(InputReader):
    _APP_PARAM = "_app"
    _KIND1_PARAM = "kind1"
    _KIND2_PARAM = "kind2"
    MAPPER_PARAMS = "mapper_params"

    def __init__(self, reader1, reader2):
        self._reader1 = reader1
        self._reader2 = reader2

    def __iter__(self):
        for u in self._reader1:
            for e in self._reader2:
                yield (u, e)

    @classmethod
    def from_json(cls, input_shard_state):
        reader1 = DatastoreInputReader.from_json(input_shard_state[cls._KIND1_PARAM])
        reader2 = DatastoreInputReader.from_json(input_shard_state[cls._KIND2_PARAM])

        return cls(reader1, reader2)

    def to_json(self):
        json_dict = {}
        json_dict[self._KIND1_PARAM] = self._reader1.to_json()
        json_dict[self._KIND2_PARAM] = self._reader2.to_json()
        return json_dict

    @classmethod
    def split_input(cls, mapper_spec):
        params = mapper_spec.params
        app = params.get(cls._APP_PARAM)
        kind1 = params.get(cls._KIND1_PARAM)
        kind2 = params.get(cls._KIND2_PARAM)
        shard_count = mapper_spec.shard_count
        shard_count_sqrt = int(math.sqrt(shard_count))

        splitted1 = DatastoreInputReader._split_input_from_params(app, kind1, params, shard_count_sqrt)
        splitted2 = DatastoreInputReader._split_input_from_params(app, kind2, params, shard_count_sqrt)
        inputs = []

        for u in splitted1:
            for e in splitted2:
                inputs.append(TwoKindsInputReader(u, e))

        #mapper_spec.shard_count = len(inputs) #uncomment this in case of "Incorrect number of shard states" (at line 408 in handlers.py)
        return inputs

    @classmethod
    def validate(cls, mapper_spec):
        return True #TODO

这段代码适合在你需要处理两种实体的所有组合时使用。你也可以把这个方法扩展到处理更多种类的实体。

下面是一个有效的mapreduce.yaml文件,用于TwoKindsInputReader

mapreduce:
- name: recommendationMapReduce
  mapper:
    input_reader: customInputReaders.TwoKindsInputReader
    handler: recommendation.calculateCorrelationHandler
    params:
    - name: kind1
      default: kinds.User
    - name: kind2
      default: kinds.Item
    - name: shard_count
      default: 16

撰写回答