Python Hadoop Streaming中的Combiner函数
我有一个映射器,它输出键和值,这些数据已经排好序,然后传给了reducer.py。
因为这些键已经排好序,所以在进入reducer之前,我想写一个合并器,它会遍历这个排好序的列表,输出一个键和一个值的列表,比如[key, [v1, v2, v3]],这个结果会在reducer中使用。
数据处理的流程是这样的:先用命令“cat data | python mapper.py | sort | python reducer.py”。
我想知道怎么写一个reducer,这样就不需要用一个字典来存储所有的键,这样会占用很多内存。
1 个回答
5
>>> import itertools
>>> import operator
>>> foo = [("a", 1), ("a", 2), ("b", 1), ("c", 1), ("c", 2)]
>>> for group in itertools.groupby(foo, operator.itemgetter(0)):
... print group[0], list(map(operator.itemgetter(1), group[1]))
...
a [1, 2]
b [1]
c [1, 2]
解释:
groupby
,顾名思义,是用来把一个可迭代的对象中的元素根据某个关键函数分成一组一组的。简单来说,它会先对可迭代对象的第一个元素调用 keyfunc
,然后一个一个地从可迭代对象中取出元素,直到 keyfunc
的返回值发生变化。这个时候,它会把到目前为止得到的所有元素都返回,然后从新的关键值开始重新分组。它的工作方式也很聪明,不会消耗不必要的内存;一旦返回了某些值,groupby
就不再保留这些值了。
在这里,我们通过 operator.itemgetter(0)
来对输入的元素进行分组,这个函数就像一个实用的“工具箱”,它把 x
映射到 x[0]
。换句话说,我们是根据元组的第一个元素来分组的,这个元素就是关键值。
当然,你需要写一个自定义的生成器来处理输入的读取(可能是从 sys.stdin
中读取),并且一个一个地返回这些元素。幸运的是,这个过程很简单,只需要使用 yield
关键字。
另外需要注意的是,这个方法假设关键值是已经排好序的。如果关键值没有排序,那就没办法了:你需要一直查看到输入的最后,才能确保你得到了某个关键值的所有值。