hadoop streaming:如何将键值列表传递给reducer?

2 投票
4 回答
7408 浏览
提问于 2025-04-17 03:44

当我们用 Javamap/reduce 程序时,map 负责收集数据,而 reducer 则接收每个键对应的一系列值,像这样:

Map(k, v) -> k1, v1  
    then shuffle and sort happens  
    then reducer gets it  

reduce(k1, List<values>)  

然后,问题是我们能不能用 python 通过 streaming 来做同样的事情呢?我参考了 这个链接,看起来 reducer 是根据命令行提供的数据逐行接收的。

4 个回答

1

PipeReducer 是 Hadoop 流处理中的一个 reducer 实现。这个 reducer 会接收键/值对,然后逐个处理这些数据,并将结果以键/值的形式发送到标准输入(STDIN),而不是以键/值对的形式。这是 Hadoop 流处理的默认行为。我没有看到有什么选项可以改变这一点,除非对 Hadoop 的代码进行了修改。

public void reduce(Object key, Iterator values, OutputCollector output,
                 Reporter reporter) throws IOException {

    .....
    while (values.hasNext()) {
    .....
        inWriter_.writeKey(key);
        inWriter_.writeValue(val);
    .....      
    }
}
5

也许这对你有帮助。我从apache.org找到了这个信息。

自定义如何将行拆分为键/值对

之前提到过,当Map/Reduce框架从mapper的标准输出中读取一行时,它会将这一行拆分成一个键/值对。默认情况下,这一行中第一个制表符(tab字符)之前的部分是键,而其余部分(不包括制表符)是值。

不过,你可以自定义这个默认设置。你可以指定一个不同于制表符的字段分隔符(默认是制表符),还可以指定行中第n个字符(n >= 1)作为键和值之间的分隔符,而不是默认的第一个字符。例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper org.apache.hadoop.mapred.lib.IdentityMapper \
    -reducer org.apache.hadoop.mapred.lib.IdentityReducer \
    -D stream.map.output.field.separator=. \
    -D stream.num.map.output.key.fields=4 

在上面的例子中,-D stream.map.output.field.separator=.指定了“.”作为map输出的字段分隔符,直到行中第四个“.”之前的部分将作为键,而其余部分(不包括第四个“.”)将作为值。如果一行中少于四个“.”,那么整行将作为键,而值将是一个空的Text对象(就像通过new Text("")创建的那样)。

同样,你可以使用-D stream.reduce.output.field.separator=SEP-D stream.num.reduce.output.fields=NUM来指定reduce输出中第n个字段分隔符作为键和值之间的分隔符。

此外,你还可以指定stream.map.input.field.separatorstream.reduce.input.field.separator作为map/reduce输入的输入分隔符。默认的分隔符是制表符。

1

在Hadoop Streaming中,mapper会把键值对写入到sys.stdout。Hadoop会负责数据的整理和排序,然后把结果传给mapper,放在sys.stdin里。你如何处理map和reduce完全由你决定,只要遵循这个模式(从stdout输出map结果,从stdin输入reduce数据)。这也是为什么你可以在不依赖Hadoop的情况下,通过命令行使用cat data | map | sort | reduce来测试。

传给reducer的输入是和mapper输出的一样的键值对,但这些数据是经过排序的。你可以像示例那样遍历结果并累加总数,或者更进一步,使用itertools.groupby(),这样就能得到类似于k1, List<values>的输入格式,这种格式在使用reduce()内置函数时非常好用。

关键是,如何实现reduce的部分完全由你来决定。

撰写回答