MapReduce:如何在Mapper中跨多行跟踪状态(例如计数三元组)?

0 投票
1 回答
601 浏览
提问于 2025-04-17 20:33

我正在尝试用Python的mrjob框架写一个MapReduce程序来计算三元组。到目前为止,我的代码是这样的:

from mrjob.job import MRJob

class MRTrigram(MRJob):

    def mapper(self, _, line):
        w = line.split()
        for idx,word in enumerate(w):
            if idx < len(w) - 2:
                # Generate a trigram using the current word and next 2 words
                trigram = w[idx] + " " + w[idx + 1] + " " + w[idx + 2]
                yield trigram, 1

    def reducer(self, key, values):
        yield sum(values), key

# ignore this part - its just standard bolierplate for mrjob!
if __name__ == '__main__':
    MRTrigram.run()

可以看到,我还没有处理三元组跨行的情况(比如,第三行末尾是“it was”,第四行开头是“the best of times”,但我的代码在这种情况下无法捕捉到三元组“it was the”!)。

我该如何在多个map调用之间保持状态,确保无论底层运行时如何分配任务,只有连续行之间的三元组被计算在内?我曾考虑在MRTrigram类中存储每行的最后两个单词,但后来我意识到我无法保证我比较的是第i行和第i+1行的单词(而不是第i行和第j行的单词,其中j可以是文档中的任何行!)。

有没有什么想法可以让我走上正轨?

1 个回答

0

你可以通过编写自定义协议来获取一些提示,了解如何实现这个功能。不过,我觉得mrjob在处理输入时是按换行符来分隔的,也就是说在你添加自定义行为(比如形成键值对)之前,它会先处理这些换行符,所以在mrjob中可能无法做到这一点。

如果你在使用Hadoop(也就是原生Java),那么你可以编写一个自定义输入格式,来处理多行文本,并从中解析出键值对。

撰写回答