Hadoop中Map函数的输入分割
这是我第一次在Hadoop上实现项目。我正在尝试在Map Reduce中实现我的算法,处理一个概率数据集。在我的数据集中,最后一列会有一些ID(数据集中唯一ID的数量等于我集群中节点的数量)。我需要根据这一列的值来划分我的数据集,每一组记录应该由我集群中的每个节点来处理。
举个例子,如果我在集群中有三个节点,那么对于下面的数据集,一个节点应该处理所有ID为1的记录,另一个节点处理ID为2的记录,还有一个节点处理ID为3的记录。
name time dept id
--------------------
b1 2:00pm z1 1
b2 3:00pm z2 2
c1 4:00pm y2 1
b3 3:00pm z3 3
c4 4:00pm x2 2
我的map函数应该将每个划分的数据作为输入,并在每个节点上并行处理。
我只是想弄清楚,在Hadoop中可以采用哪种方法。是将这个数据集作为输入传递给我的map函数,并传递一个额外的参数来根据ID值划分数据?还是提前将数据划分为“n”(节点数量)个子集,然后加载到节点中?如果这是正确的方法,如何根据值来划分数据并加载到不同的节点呢?因为我从阅读中了解到,Hadoop是根据指定的大小将数据划分为块的。我们如何在加载时指定特定的条件呢?另外,我是在用Python写我的程序。
请大家给点建议。谢谢!
2 个回答
1
对你来说,最简单的方法可能就是让映射器把数据输出时用id作为键,这样可以确保一个归约器会收到所有特定id的记录,然后你可以在归约阶段进行处理。
举个例子,
输入数据:
b1 2:00pm z1 1
b2 3:00pm z2 2
c1 4:00pm y2 1
b3 3:00pm z3 3
c4 4:00pm x2 2
映射器代码:
#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip()
cols = line.split("\t")
key = cols[-1]
print key + "\t" + line
映射输出:
1 b1 2:00pm z1 1
2 b2 3:00pm z2 2
1 c1 4:00pm y2 1
3 b3 3:00pm z3 3
2 c4 4:00pm x2 2
归约器1的输入:
1 b1 2:00pm z1 1
1 c1 4:00pm y2 1
归约器2的输入:
2 b2 3:00pm z2 2
归约器3的输入:
3 b3 3:00pm z3 3
归约器代码:
#!/usr/bin/env python
import sys
for line in sys.stdin:
line = line.strip()
cols = line.split("\t")
orig_line = "\t".join(cols[1:])
# do stuff...
注意,这样一个归约器可能会接收到多个键,但数据会是有序的,并且你可以通过 mapred.reduce.tasks 选项来控制归约器的数量。
编辑
如果你想在归约器中按键收集数据,可以这样做(不确定这样写是否能直接运行,但你能明白意思)。
#!/usr/bin/env python
import sys
def process_data(key_id, data_list):
# data_list has all the lines for key_id
last_key = None
data = []
for line in sys.stdin:
line = line.strip()
cols = line.split("\t")
key = cols[0]
if last_key and key != last_key:
process_data(last_key, data)
data = []
orig_line = "\t".join(cols[1:])
data.append(orig_line)
last_key = key
process_data(last_key, data)
如果你不担心在归约步骤中内存不够,可以把代码简化成这样:
#!/usr/bin/env python
import sys
from collections import defaultdict
def process_data(key_id, data_list):
# data_list has all the lines for key_id
all_data = defaultdict(list)
for line in sys.stdin:
line = line.strip()
cols = line.split("\t")
key = cols[0]
orig_line = "\t".join(cols[1:])
all_data[key].append(orig_line)
for key, data in all_data.iteritems():
process_data(key, data)