MRjob:减速器可以执行两个操作吗?
我想计算每个从映射器生成的键值对的概率。
假设映射器生成了:
a, (r, 5)
a, (e, 6)
a, (w, 7)
我需要把5+6+7加起来,结果是18,然后计算概率,分别是5/18、6/18和7/18。
所以,最后从归约器得到的输出应该是:
a, [[r, 5, 0.278], [e, 6, 0.33], [w, 7, 0.389]]
目前为止,我只能让归约器把所有的整数加起来。
我该怎么做才能让它回去,把每个数值都除以总和呢?
谢谢!
3 个回答
你可以像你之前做的那样,简单地计算总和,同时把配对的数据保存在内存中,这样就能输出你想要的概率,方法如下:
reduce (key, list<values>):
int sum = 0;
for (value in values) {
sum = sum + value.frequency; //assuming you can extract two fields in each value: value.word and value.frequency
}
String outputValue = "[";
for (value in values) { //iterate over the values once more
outputValue = outputValue + "["+ value.word + ", " +value.frequency + ", "+ value.frequency/sum +"],"
}
outputValue = outputValue.replaceLast(",","]");
emit (key, outputValue);
当然,这只是个伪代码,因为我不太熟悉Python,不过我希望转换过来应该很简单。
你上面所做的应该是可以的,但这是假设所有与某个键相关的数据都能放进内存。如果数据能放下,那在Reducer阶段你就可以把所有的值都放在内存里,然后计算总和,再算出每个键值对的边际值。这种方法通常被称为“条纹”方法。
不过,大多数情况下,数据可能放不下内存。在这种情况下,你需要找到一种方法,在处理实际的键值对之前先发送一些值来计算总和,这样就能用这些值来计算边际值,并立即输出结果。
这就是“反转顺序”设计模式的一个应用。当你需要计算相对频率时,这个方法很有用。基本的思路是在Mapper阶段,对于每个中间数据,你会输出两个键值对,其中一个键值对的键对于所有值都是相同的。这一部分将用于计算总和。
举个例子:
For a, (r, 5) :
---------------
emit (a, r), 5
emit (a, *), 5
For a, (e, 6) :
---------------
emit (a, e), 6
emit (a, *), 6
For a, (w, 7) :
---------------
emit (a, w), 7
emit (a, *), 7
完成这个后,你需要一个分区器,它会根据键中的第一个值来划分每个中间的键值对。在上面的例子中就是用“a”。
你还需要一个键的排序规则,确保在键的第二部分有 * 的键总是排在最前面。
这样,所有在键的第一部分有“a”的中间键都会被分到同一个reducer里。而且它们会按照下面的方式排序 -
emit (a, *), 5
emit (a, *), 6
emit (a, *), 7
emit (a, e), 6
emit (a, r), 5
emit (a, w), 7
在reducer中,当你遍历这些键值对时,如果键的第二部分有 *,你就需要把这些值累加起来。然后你可以用这个累加的值来计算其他键值对的边际值。
total = 0
for(value : values){
if (key.second == *)
total += value
else
emit (key.first , key.second, value, value/total)
}
这种设计模式通常被称为反转顺序,它使用了对偶的方式。如果你想了解更多关于这个和其他设计模式的信息,我建议你阅读这本书中关于MapReduce设计模式的章节 - http://lintool.github.com/MapReduceAlgorithms/。里面有很好的解释和例子。
Pai的解决方案在技术上是对的,但在实际操作中会让你遇到很多麻烦,因为设置分区会非常复杂(可以参考这个链接:https://groups.google.com/forum/#!topic/mrjob/aV7bNn0sJ2k)。
你可以通过使用mrjob.step来更轻松地完成这个任务,然后创建两个归约器,比如这个例子:https://github.com/Yelp/mrjob/blob/master/mrjob/examples/mr_next_word_stats.py
如果你想按照你描述的方式来做:
from mrjob.job import MRJob
import re
from mrjob.step import MRStep
from collections import defaultdict
wordRe = re.compile(r"[\w]+")
class MRComplaintFrequencyCount(MRJob):
def mapper(self, _, line):
self.increment_counter('group','num_mapper_calls',1)
#Issue is third column in csv
issue = line.split(",")[3]
for word in wordRe.findall(issue):
#Send all map outputs to same reducer
yield word.lower(), 1
def reducer(self, key, values):
self.increment_counter('group','num_reducer_calls',1)
wordCounts = defaultdict(int)
total = 0
for value in values:
word, count = value
total+=count
wordCounts[word]+=count
for k,v in wordCounts.iteritems():
# word, frequency, relative frequency
yield k, (v, float(v)/total)
def combiner(self, key, values):
self.increment_counter('group','num_combiner_calls',1)
yield None, (key, sum(values))
if __name__ == '__main__':
MRComplaintFrequencyCount.run()
这个代码会进行标准的单词计数,主要在合并器中进行汇总,然后使用“None”作为公共键,这样每个单词都会间接地以相同的键发送到归约器。在归约器中,你可以得到总的单词计数,并计算相对频率。