多文件中所有行的MapReduce成对比较

3 投票
1 回答
2734 浏览
提问于 2025-04-16 21:13

我刚开始使用Python的mrjob,想把一些运行时间很长的Python程序转换成MapReduce的Hadoop任务。我已经成功运行了简单的单词计数示例,并且大致理解了“文本分类”的例子。

不过,我在弄清楚如何将我的问题解决时遇到了一些困难。

我有大约6000个文件,每个文件有2到800行。每一行都是用空格分隔的简单“信号”。我需要比较每个文件中每一行和所有其他文件中每一行(包括它自己)之间的相关性。然后根据相关系数输出结果。

这是一个文件的例子:

1 2 3 4 2 3 1 2 3 4 1 2
2 2 3 1 3 3 1 2 3 1 4 1
2 3 4 5 3 2 1 3 4 5 2 1
...

我需要将这个文件中的每一行与其他每个文件中的每一行配对……或者我可以把所有文件合并成一个文件,如果这样更简单,但我仍然需要进行逐对的迭代。

我知道如何进行计算,以及如何使用最后的归约步骤来汇总和过滤结果。现在我遇到的难题是,如何在不一次性读取所有文件的情况下,将所有的配对项传递给后续步骤?我想我可以提前准备一个输入文件,使用itertools.product,但这个文件可能会大得惊人。

1 个回答

1

好吧,既然没有人给出答案,我就把我现在的解决方法分享出来,希望能帮助到其他需要的人。我不确定这个方法有多“标准”或高效,但到目前为止它一直有效。

我把文件名放在每一行的最前面,然后加上一个 \t,接着是其他数据。为了简单起见,这个例子我每行只用一个数字,然后计算它们的平均值,这只是一个非常简单的例子。

接着,我在 mrjob 中做了以下的映射-归约步骤。

class MRAvgPairwiseLines(MRJob):

def input_mapper(self, _, value):
    """Takes each input line and converts it to (fnum, num) and a key of 'ALL'"""

    fnum, val = value.split('\t')
    yield 'ALL', (fnum, val)

def input_reducer(self, key, values):

    for (fnum1, val1), (fnum2, val2) in product(values, repeat = 2):
        yield fnum1, (fnum1, fnum2, val1, val2)

def do_avg(self, key, value):

    fnum1, fnum2, val1, val2 = value
    res = (float(val1)+float(val2))/float(2)
    yield key, (fnum2, res)

def get_max_avg(self, key, values):

    max_fnum, max_avg = max(values, key = lambda x: x[1])
    yield key, (max_fnum, max_avg)

def steps(self):
    return [self.mr(mapper=self.input_mapper, reducer=self.input_reducer),
                self.mr(mapper=self.do_avg, reducer=self.get_max_avg)]

这样,所有来自 input_mapper 函数的输出都会被分组到同一个 input_reducer 中,然后这个函数会依次返回一对对的数据。接着这些数据会被传递到合适的位置,最终返回最大的平均值(实际上就是所有其他文件中的最大项)。

希望这能帮助到某些人。

撰写回答