MRjob:减速器可以执行两个操作吗?

2 投票
3 回答
4738 浏览
提问于 2025-04-17 16:56

我想计算每个从映射器生成的键值对的概率。

假设映射器生成了:

a, (r, 5)
a, (e, 6)
a, (w, 7)

我需要把5+6+7加起来,结果是18,然后计算概率,分别是5/18、6/18和7/18。

所以,最后从归约器得到的输出应该是:

a, [[r, 5, 0.278], [e, 6, 0.33], [w, 7, 0.389]]

目前为止,我只能让归约器把所有的整数加起来。
我该怎么做才能让它回去,把每个数值都除以总和呢?

谢谢!

3 个回答

1

你可以像你之前做的那样,简单地计算总和,同时把配对的数据保存在内存中,这样就能输出你想要的概率,方法如下:

reduce (key, list<values>):
    int sum = 0;
    for (value in values) {
        sum = sum + value.frequency; //assuming you can extract two fields in each value: value.word and value.frequency
    }
    String outputValue = "[";
    for (value in values) { //iterate over the values once more
        outputValue = outputValue + "["+ value.word + ", " +value.frequency + ", "+ value.frequency/sum +"],"
    }
    outputValue = outputValue.replaceLast(",","]");
    emit (key, outputValue);

当然,这只是个伪代码,因为我不太熟悉Python,不过我希望转换过来应该很简单。

4

你上面所做的应该是可以的,但这是假设所有与某个键相关的数据都能放进内存。如果数据能放下,那在Reducer阶段你就可以把所有的值都放在内存里,然后计算总和,再算出每个键值对的边际值。这种方法通常被称为“条纹”方法。

不过,大多数情况下,数据可能放不下内存。在这种情况下,你需要找到一种方法,在处理实际的键值对之前先发送一些值来计算总和,这样就能用这些值来计算边际值,并立即输出结果。

这就是“反转顺序”设计模式的一个应用。当你需要计算相对频率时,这个方法很有用。基本的思路是在Mapper阶段,对于每个中间数据,你会输出两个键值对,其中一个键值对的键对于所有值都是相同的。这一部分将用于计算总和。

举个例子:

For a, (r, 5) :
---------------
emit (a, r), 5
emit (a, *), 5


For a, (e, 6) :
---------------
emit (a, e), 6
emit (a, *), 6


For a, (w, 7) :
---------------
emit (a, w), 7
emit (a, *), 7

完成这个后,你需要一个分区器,它会根据键中的第一个值来划分每个中间的键值对。在上面的例子中就是用“a”。

你还需要一个键的排序规则,确保在键的第二部分有 * 的键总是排在最前面。

这样,所有在键的第一部分有“a”的中间键都会被分到同一个reducer里。而且它们会按照下面的方式排序 -

emit (a, *), 5
emit (a, *), 6
emit (a, *), 7
emit (a, e), 6
emit (a, r), 5
emit (a, w), 7

在reducer中,当你遍历这些键值对时,如果键的第二部分有 *,你就需要把这些值累加起来。然后你可以用这个累加的值来计算其他键值对的边际值。

total = 0
for(value : values){
    if (key.second == *)
        total += value
    else
        emit (key.first , key.second, value, value/total)
}

这种设计模式通常被称为反转顺序,它使用了对偶的方式。如果你想了解更多关于这个和其他设计模式的信息,我建议你阅读这本书中关于MapReduce设计模式的章节 - http://lintool.github.com/MapReduceAlgorithms/。里面有很好的解释和例子。

6

Pai的解决方案在技术上是对的,但在实际操作中会让你遇到很多麻烦,因为设置分区会非常复杂(可以参考这个链接:https://groups.google.com/forum/#!topic/mrjob/aV7bNn0sJ2k)。

你可以通过使用mrjob.step来更轻松地完成这个任务,然后创建两个归约器,比如这个例子:https://github.com/Yelp/mrjob/blob/master/mrjob/examples/mr_next_word_stats.py

如果你想按照你描述的方式来做:

from mrjob.job import MRJob
import re
from mrjob.step import MRStep
from collections import defaultdict

wordRe = re.compile(r"[\w]+")

class MRComplaintFrequencyCount(MRJob):

    def mapper(self, _, line):
        self.increment_counter('group','num_mapper_calls',1)

        #Issue is third column in csv
        issue = line.split(",")[3]

        for word in wordRe.findall(issue):
            #Send all map outputs to same reducer
            yield word.lower(), 1

    def reducer(self, key, values):
        self.increment_counter('group','num_reducer_calls',1)  
        wordCounts = defaultdict(int)
        total = 0         
        for value in values:
            word, count = value
            total+=count
            wordCounts[word]+=count

        for k,v in wordCounts.iteritems():
            # word, frequency, relative frequency 
            yield k, (v, float(v)/total)

    def combiner(self, key, values):
        self.increment_counter('group','num_combiner_calls',1) 
        yield None, (key, sum(values))


if __name__ == '__main__':
    MRComplaintFrequencyCount.run()

这个代码会进行标准的单词计数,主要在合并器中进行汇总,然后使用“None”作为公共键,这样每个单词都会间接地以相同的键发送到归约器。在归约器中,你可以得到总的单词计数,并计算相对频率。

撰写回答