高效减少MapReduce结果的方法?
我写了一个MapReduce任务,用来处理一个数据集中的ngram计数。结果保存在一百个300MB的文件里,格式是<ngram>\t<count>
。我想把这些结果合并成一个,但我尝试了几次合并都崩溃了(“任务跟踪器失去响应”)。我设置的超时时间是8小时,而崩溃发生在大约8.5小时,所以可能有关系。我设置了5个reducer(和节点数量相同)。也许我只需要留更多的时间,尽管错误信息似乎并没有指向这一点。我怀疑我的节点负载过重,变得无响应。我推测我的reducer可能需要一些优化。
我在mapper中使用了cat
,并且用以下的python脚本作为我的reducer:
#!/usr/bin/env python
import sys
counts = {}
for line in sys.stdin:
line = line.strip()
key, count = line.split('\t', 1)
try:
count = int(count)
except ValueError:
continue
if key not in counts:
counts[key] = 0
counts[key] += count
for key in sorted(counts.keys()):
print '%s\t%s'% (key, counts[key])
更新:
正如我在评论中提到的,我对Hadoop自动进行的排序有些困惑。在网页界面上,reducer的状态显示了几个不同的阶段,包括“排序”和“减少”。从这一点我推测Hadoop在发送mapper输出到reducer之前会先进行排序,但不清楚的是,这个排序是针对发送到reducer的所有数据,还是针对每个文件在减少之前进行的。换句话说,我的mapper处理这100个文件,把它们分成400个输出,每个输出都简单地用cat
发送到reducer,然后5个reducer各自接收这些80个流。排序是把所有80个流合并在一起,还是逐个排序,减少后再处理?根据图表,虽然这些图表可能并不准确地反映实际情况,排序过程似乎是在任何减少之前进行的。如果排序确实是对所有输入文件进行的,那么我可以简化我的reducer,不需要存储所有计数的字典,只需在键变化时打印出键和总计数的配对。
关于使用combiner,我觉得在我的情况下没有什么好处,因为我正在减少的数据已经在我尝试合并的100个文件中被减少过了。由于我的节点数量等于reducer数量(都是5),所以没有什么可以合并的,reducer已经在做这些工作了。
2 个回答
使用 top
命令来检查你的 reducer 是否是 CPU 密集型的,而不是 IO 密集型的(这可能会导致交换)。
每台主机处理 20 个任务需要 8 小时,这意味着每个 300Mb 的任务大约需要 24 分钟。
你可以考虑使用 heapq
,这样在内存中构建的数据结构会保持有序:详细信息请查看以下链接的第 8.4.1 节:
http://docs.python.org/library/heapq.html
我之前对MapReduce的工作原理理解有误。进入Reducer的所有数据都是经过排序的。我上面的代码完全没有优化。其实,我只需要记录当前的键,然后在出现新键时打印出之前的那个键就可以了。
#!/usr/bin/env python
import sys
cur_key = None
cur_key_count = 0
for line in sys.stdin:
line = line.strip()
key, count = line.split('\t', 1)
try:
count = int(count)
except ValueError:
continue
# if new key, reset count, note current key, and output lastk key's result
if key != cur_key:
if cur_key is not None:
print '%s\t%s'% (cur_key, cur_key_count)
cur_key = key
cur_key_count = 0
cur_key_count += count
# printing out final key if set
if cur_key:
print '%s\t%s'% (cur_key, cur_key_count)