高效减少MapReduce结果的方法?

2 投票
2 回答
1411 浏览
提问于 2025-04-17 05:42

我写了一个MapReduce任务,用来处理一个数据集中的ngram计数。结果保存在一百个300MB的文件里,格式是<ngram>\t<count>。我想把这些结果合并成一个,但我尝试了几次合并都崩溃了(“任务跟踪器失去响应”)。我设置的超时时间是8小时,而崩溃发生在大约8.5小时,所以可能有关系。我设置了5个reducer(和节点数量相同)。也许我只需要留更多的时间,尽管错误信息似乎并没有指向这一点。我怀疑我的节点负载过重,变得无响应。我推测我的reducer可能需要一些优化。

我在mapper中使用了cat,并且用以下的python脚本作为我的reducer:

#!/usr/bin/env python
import sys

counts = {}
for line in sys.stdin:
    line = line.strip()
    key, count = line.split('\t', 1)

    try:
        count = int(count)
    except ValueError:
        continue

    if key not in counts:
        counts[key] = 0
    counts[key] += count

for key in sorted(counts.keys()):
    print '%s\t%s'% (key, counts[key])

更新: 正如我在评论中提到的,我对Hadoop自动进行的排序有些困惑。在网页界面上,reducer的状态显示了几个不同的阶段,包括“排序”和“减少”。从这一点我推测Hadoop在发送mapper输出到reducer之前会先进行排序,但不清楚的是,这个排序是针对发送到reducer的所有数据,还是针对每个文件在减少之前进行的。换句话说,我的mapper处理这100个文件,把它们分成400个输出,每个输出都简单地用cat发送到reducer,然后5个reducer各自接收这些80个流。排序是把所有80个流合并在一起,还是逐个排序,减少后再处理?根据图表,虽然这些图表可能并不准确地反映实际情况,排序过程似乎是在任何减少之前进行的。如果排序确实是对所有输入文件进行的,那么我可以简化我的reducer,不需要存储所有计数的字典,只需在键变化时打印出键和总计数的配对。

关于使用combiner,我觉得在我的情况下没有什么好处,因为我正在减少的数据已经在我尝试合并的100个文件中被减少过了。由于我的节点数量等于reducer数量(都是5),所以没有什么可以合并的,reducer已经在做这些工作了。

2 个回答

1

使用 top 命令来检查你的 reducer 是否是 CPU 密集型的,而不是 IO 密集型的(这可能会导致交换)。

每台主机处理 20 个任务需要 8 小时,这意味着每个 300Mb 的任务大约需要 24 分钟。

你可以考虑使用 heapq,这样在内存中构建的数据结构会保持有序:详细信息请查看以下链接的第 8.4.1 节: http://docs.python.org/library/heapq.html

2

我之前对MapReduce的工作原理理解有误。进入Reducer的所有数据都是经过排序的。我上面的代码完全没有优化。其实,我只需要记录当前的键,然后在出现新键时打印出之前的那个键就可以了。

#!/usr/bin/env python
import sys

cur_key = None
cur_key_count = 0
for line in sys.stdin:
    line = line.strip()
    key, count = line.split('\t', 1)

    try:
        count = int(count)
    except ValueError:
        continue

    # if new key, reset count, note current key, and output lastk key's result
    if key != cur_key:
        if cur_key is not None:
            print '%s\t%s'% (cur_key, cur_key_count)
        cur_key = key
        cur_key_count = 0
    cur_key_count += count
# printing out final key if set
if cur_key:
    print '%s\t%s'% (cur_key, cur_key_count)

撰写回答