在Google App Engine上对多个数据存储类型进行MapReduce
我刚刚看了2010年Google I/O的App Engine批量数据处理会议,还读了一些Google研究的MapReduce文章,现在我在考虑用Google App Engine上的MapReduce来实现一个推荐系统,使用Python编程。
我更喜欢使用appengine-mapreduce,而不是任务队列API,因为前者可以轻松遍历所有实例,自动进行批处理,自动链接任务等等。不过,我遇到一个问题:我的推荐系统需要计算两个不同模型实例之间的相关性,也就是说要处理两种不同类型的数据。
举个例子:
我有这两个模型:用户(User)和物品(Item)。每个模型都有一个标签列表作为属性。下面是计算用户和物品之间相关性的函数。需要注意的是,calculateCorrelation
函数应该对每一对用户和物品组合调用:
def calculateCorrelation(user, item):
return calculateCorrelationAverage(u.tags, i.tags)
def calculateCorrelationAverage(tags1, tags2):
correlationSum = 0.0
for (tag1, tag2) in allCombinations(tags1, tags2):
correlationSum += correlation(tag1, tag2)
return correlationSum / (len(tags1) + len(tags2))
def allCombinations(list1, list2):
combinations = []
for x in list1:
for y in list2:
combinations.append((x, y))
return combinations
但是,这个calculateCorrelation
并不是appengine-mapreduce中的有效Mapper,可能这个函数甚至不符合MapReduce的计算概念。不过,我需要确认一下……如果能利用appengine-mapreduce的优势,比如自动批处理和任务链接,那就太好了。
有没有什么解决办法呢?
我应该定义自己的InputReader吗?一个新的InputReader可以读取两种不同类型的所有实例,这样能和当前的appengine-mapreduce实现兼容吗?
还是我应该尝试以下方法呢?
- 将这两种类型的所有实体的所有键两两组合,形成一个新的模型实例(可能使用MapReduce)
- 使用mappers遍历这个新模型的实例
- 对于每个实例,使用其中的键获取两种不同类型的实体,并计算它们之间的相关性。
2 个回答
如果没有更多关于你在计算什么的细节,很难给出具体的建议。一个简单的选择是在map调用中直接获取相关的实体——在这里进行数据存储操作是没有问题的。
不过,这样会导致很多小的调用。正如你所提到的,编写一个自定义的InputReader可以让你并行获取这两组实体,这样会大大提高性能。
如果你能提供更多关于如何连接这些实体的细节,我们可能能给出更具体的建议。
根据Nick Johnson的建议,我写了一个自己的输入读取器。这个读取器可以从两种不同的实体中获取数据。它会生成这些实体的所有组合。下面是代码:
class TwoKindsInputReader(InputReader):
_APP_PARAM = "_app"
_KIND1_PARAM = "kind1"
_KIND2_PARAM = "kind2"
MAPPER_PARAMS = "mapper_params"
def __init__(self, reader1, reader2):
self._reader1 = reader1
self._reader2 = reader2
def __iter__(self):
for u in self._reader1:
for e in self._reader2:
yield (u, e)
@classmethod
def from_json(cls, input_shard_state):
reader1 = DatastoreInputReader.from_json(input_shard_state[cls._KIND1_PARAM])
reader2 = DatastoreInputReader.from_json(input_shard_state[cls._KIND2_PARAM])
return cls(reader1, reader2)
def to_json(self):
json_dict = {}
json_dict[self._KIND1_PARAM] = self._reader1.to_json()
json_dict[self._KIND2_PARAM] = self._reader2.to_json()
return json_dict
@classmethod
def split_input(cls, mapper_spec):
params = mapper_spec.params
app = params.get(cls._APP_PARAM)
kind1 = params.get(cls._KIND1_PARAM)
kind2 = params.get(cls._KIND2_PARAM)
shard_count = mapper_spec.shard_count
shard_count_sqrt = int(math.sqrt(shard_count))
splitted1 = DatastoreInputReader._split_input_from_params(app, kind1, params, shard_count_sqrt)
splitted2 = DatastoreInputReader._split_input_from_params(app, kind2, params, shard_count_sqrt)
inputs = []
for u in splitted1:
for e in splitted2:
inputs.append(TwoKindsInputReader(u, e))
#mapper_spec.shard_count = len(inputs) #uncomment this in case of "Incorrect number of shard states" (at line 408 in handlers.py)
return inputs
@classmethod
def validate(cls, mapper_spec):
return True #TODO
这段代码适合在你需要处理两种实体的所有组合时使用。你也可以把这个方法扩展到处理更多种类的实体。
下面是一个有效的mapreduce.yaml文件,用于TwoKindsInputReader
:
mapreduce:
- name: recommendationMapReduce
mapper:
input_reader: customInputReaders.TwoKindsInputReader
handler: recommendation.calculateCorrelationHandler
params:
- name: kind1
default: kinds.User
- name: kind2
default: kinds.Item
- name: shard_count
default: 16