Hadoop中Map函数的输入分割

1 投票
2 回答
1705 浏览
提问于 2025-04-20 09:53

这是我第一次在Hadoop上实现项目。我正在尝试在Map Reduce中实现我的算法,处理一个概率数据集。在我的数据集中,最后一列会有一些ID(数据集中唯一ID的数量等于我集群中节点的数量)。我需要根据这一列的值来划分我的数据集,每一组记录应该由我集群中的每个节点来处理。

举个例子,如果我在集群中有三个节点,那么对于下面的数据集,一个节点应该处理所有ID为1的记录,另一个节点处理ID为2的记录,还有一个节点处理ID为3的记录。

name time  dept  id
--------------------
 b1  2:00pm z1   1
 b2  3:00pm z2   2
 c1  4:00pm y2   1
 b3  3:00pm z3   3
 c4  4:00pm x2   2

我的map函数应该将每个划分的数据作为输入,并在每个节点上并行处理。

我只是想弄清楚,在Hadoop中可以采用哪种方法。是将这个数据集作为输入传递给我的map函数,并传递一个额外的参数来根据ID值划分数据?还是提前将数据划分为“n”(节点数量)个子集,然后加载到节点中?如果这是正确的方法,如何根据值来划分数据并加载到不同的节点呢?因为我从阅读中了解到,Hadoop是根据指定的大小将数据划分为块的。我们如何在加载时指定特定的条件呢?另外,我是在用Python写我的程序。

请大家给点建议。谢谢!

2 个回答

0

如果我理解你的问题没错,最好的方法是把你的数据集加载到一个hive表里,然后用python写一个UDF(用户定义函数)。接下来,你可以这样做:

select your_python_udf(name, time, dept, id) from table group by id;

这看起来像是一个减少阶段,所以在你执行查询之前,可能需要做这个。

set mapred.reduce.tasks=50;

如何创建自定义的UDF:

Hive 插件

创建函数

1

对你来说,最简单的方法可能就是让映射器把数据输出时用id作为键,这样可以确保一个归约器会收到所有特定id的记录,然后你可以在归约阶段进行处理。

举个例子,

输入数据:

 b1  2:00pm z1   1
 b2  3:00pm z2   2
 c1  4:00pm y2   1
 b3  3:00pm z3   3
 c4  4:00pm x2   2

映射器代码:

#!/usr/bin/env python
import sys
for line in sys.stdin:
    line = line.strip()
    cols = line.split("\t")
    key = cols[-1]
    print key + "\t" + line

映射输出:

 1  b1  2:00pm z1   1
 2  b2  3:00pm z2   2
 1  c1  4:00pm y2   1
 3  b3  3:00pm z3   3
 2  c4  4:00pm x2   2

归约器1的输入:

 1  b1  2:00pm z1   1
 1  c1  4:00pm y2   1

归约器2的输入:

 2  b2  3:00pm z2   2

归约器3的输入:

 3  b3  3:00pm z3   3

归约器代码:

#!/usr/bin/env python
import sys
for line in sys.stdin:
    line = line.strip()
    cols = line.split("\t")
    orig_line = "\t".join(cols[1:])
    # do stuff...

注意,这样一个归约器可能会接收到多个键,但数据会是有序的,并且你可以通过 mapred.reduce.tasks 选项来控制归约器的数量。

编辑

如果你想在归约器中按键收集数据,可以这样做(不确定这样写是否能直接运行,但你能明白意思)。

#!/usr/bin/env python
import sys
def process_data(key_id, data_list):
   # data_list has all the lines for key_id

last_key = None
data = []
for line in sys.stdin:
    line = line.strip()
    cols = line.split("\t")
    key = cols[0]
    if last_key and key != last_key:
        process_data(last_key, data)
        data = []
    orig_line = "\t".join(cols[1:])
    data.append(orig_line)
    last_key = key
process_data(last_key, data)

如果你不担心在归约步骤中内存不够,可以把代码简化成这样:

#!/usr/bin/env python
import sys
from collections import defaultdict
def process_data(key_id, data_list):
   # data_list has all the lines for key_id

all_data = defaultdict(list)
for line in sys.stdin:
    line = line.strip()
    cols = line.split("\t")
    key = cols[0]
    orig_line = "\t".join(cols[1:])
    all_data[key].append(orig_line)
for key, data in all_data.iteritems():
    process_data(key, data)

撰写回答