hadoop streaming:如何将键值列表传递给reducer?
当我们用 Java
写 map/reduce
程序时,map 负责收集数据,而 reducer 则接收每个键对应的一系列值,像这样:
Map(k, v) -> k1, v1
then shuffle and sort happens
then reducer gets it
reduce(k1, List<values>)
然后,问题是我们能不能用 python
通过 streaming
来做同样的事情呢?我参考了 这个链接,看起来 reducer 是根据命令行提供的数据逐行接收的。
4 个回答
PipeReducer 是 Hadoop 流处理中的一个 reducer 实现。这个 reducer 会接收键/值对,然后逐个处理这些数据,并将结果以键/值的形式发送到标准输入(STDIN),而不是以键/值对的形式。这是 Hadoop 流处理的默认行为。我没有看到有什么选项可以改变这一点,除非对 Hadoop 的代码进行了修改。
public void reduce(Object key, Iterator values, OutputCollector output,
Reporter reporter) throws IOException {
.....
while (values.hasNext()) {
.....
inWriter_.writeKey(key);
inWriter_.writeValue(val);
.....
}
}
也许这对你有帮助。我从apache.org找到了这个信息。
自定义如何将行拆分为键/值对
之前提到过,当Map/Reduce框架从mapper的标准输出中读取一行时,它会将这一行拆分成一个键/值对。默认情况下,这一行中第一个制表符(tab字符)之前的部分是键,而其余部分(不包括制表符)是值。
不过,你可以自定义这个默认设置。你可以指定一个不同于制表符的字段分隔符(默认是制表符),还可以指定行中第n个字符(n >= 1)作为键和值之间的分隔符,而不是默认的第一个字符。例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper org.apache.hadoop.mapred.lib.IdentityMapper \
-reducer org.apache.hadoop.mapred.lib.IdentityReducer \
-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4
在上面的例子中,-D stream.map.output.field.separator=.
指定了“.”作为map输出的字段分隔符,直到行中第四个“.”之前的部分将作为键,而其余部分(不包括第四个“.”)将作为值。如果一行中少于四个“.”,那么整行将作为键,而值将是一个空的Text对象(就像通过new Text("")创建的那样)。
同样,你可以使用-D stream.reduce.output.field.separator=SEP
和-D stream.num.reduce.output.fields=NUM
来指定reduce输出中第n个字段分隔符作为键和值之间的分隔符。
此外,你还可以指定stream.map.input.field.separator
和stream.reduce.input.field.separator
作为map/reduce输入的输入分隔符。默认的分隔符是制表符。
在Hadoop Streaming中,mapper会把键值对写入到sys.stdout
。Hadoop会负责数据的整理和排序,然后把结果传给mapper,放在sys.stdin
里。你如何处理map和reduce完全由你决定,只要遵循这个模式(从stdout输出map结果,从stdin输入reduce数据)。这也是为什么你可以在不依赖Hadoop的情况下,通过命令行使用cat data | map | sort | reduce
来测试。
传给reducer的输入是和mapper输出的一样的键值对,但这些数据是经过排序的。你可以像示例那样遍历结果并累加总数,或者更进一步,使用itertools.groupby()
,这样就能得到类似于k1, List<values>
的输入格式,这种格式在使用reduce()
内置函数时非常好用。
关键是,如何实现reduce的部分完全由你来决定。