多文件中所有行的MapReduce成对比较
我刚开始使用Python的mrjob,想把一些运行时间很长的Python程序转换成MapReduce的Hadoop任务。我已经成功运行了简单的单词计数示例,并且大致理解了“文本分类”的例子。
不过,我在弄清楚如何将我的问题解决时遇到了一些困难。
我有大约6000个文件,每个文件有2到800行。每一行都是用空格分隔的简单“信号”。我需要比较每个文件中每一行和所有其他文件中每一行(包括它自己)之间的相关性。然后根据相关系数输出结果。
这是一个文件的例子:
1 2 3 4 2 3 1 2 3 4 1 2
2 2 3 1 3 3 1 2 3 1 4 1
2 3 4 5 3 2 1 3 4 5 2 1
...
我需要将这个文件中的每一行与其他每个文件中的每一行配对……或者我可以把所有文件合并成一个文件,如果这样更简单,但我仍然需要进行逐对的迭代。
我知道如何进行计算,以及如何使用最后的归约步骤来汇总和过滤结果。现在我遇到的难题是,如何在不一次性读取所有文件的情况下,将所有的配对项传递给后续步骤?我想我可以提前准备一个输入文件,使用itertools.product
,但这个文件可能会大得惊人。
1 个回答
1
好吧,既然没有人给出答案,我就把我现在的解决方法分享出来,希望能帮助到其他需要的人。我不确定这个方法有多“标准”或高效,但到目前为止它一直有效。
我把文件名放在每一行的最前面,然后加上一个 \t
,接着是其他数据。为了简单起见,这个例子我每行只用一个数字,然后计算它们的平均值,这只是一个非常简单的例子。
接着,我在 mrjob
中做了以下的映射-归约步骤。
class MRAvgPairwiseLines(MRJob):
def input_mapper(self, _, value):
"""Takes each input line and converts it to (fnum, num) and a key of 'ALL'"""
fnum, val = value.split('\t')
yield 'ALL', (fnum, val)
def input_reducer(self, key, values):
for (fnum1, val1), (fnum2, val2) in product(values, repeat = 2):
yield fnum1, (fnum1, fnum2, val1, val2)
def do_avg(self, key, value):
fnum1, fnum2, val1, val2 = value
res = (float(val1)+float(val2))/float(2)
yield key, (fnum2, res)
def get_max_avg(self, key, values):
max_fnum, max_avg = max(values, key = lambda x: x[1])
yield key, (max_fnum, max_avg)
def steps(self):
return [self.mr(mapper=self.input_mapper, reducer=self.input_reducer),
self.mr(mapper=self.do_avg, reducer=self.get_max_avg)]
这样,所有来自 input_mapper
函数的输出都会被分组到同一个 input_reducer
中,然后这个函数会依次返回一对对的数据。接着这些数据会被传递到合适的位置,最终返回最大的平均值(实际上就是所有其他文件中的最大项)。
希望这能帮助到某些人。