mrjob:示例如何自动识别文本文件中的行?
我正在尝试更好地理解mrjob的例子。
from mrjob.job import MRJob
class MRWordFrequencyCount(MRJob):
def mapper(self, _, line):
yield "chars", len(line)
yield "words", len(line.split())
yield "lines", 1
def reducer(self, key, values):
yield key, sum(values)
if __name__ == '__main__':
MRWordFrequencyCount.run()
我通过
$ python word_count.py my_file.txt
来运行它,结果如我所预期的那样,但我不明白它是怎么自动知道要读取一个文本文件并按每行分割的。我也不太确定_
是干什么用的。
根据我的理解,mapper()
会为每一行生成三个键值对,对吧?如果我想处理一个文件夹里的每个文件,该怎么做呢?
而reducer()
是怎么自动知道要把每个键的值加起来的呢?
如果我想通过map reduce来运行单元测试,mapper和reducer应该是什么样子的?这样做有必要吗?
3 个回答
from mrjob.job import MRJob
class MRRatingCounter(MRJob):
def mapper(self, key, line):
(userID, movieID, rating, timestamp) = line.split('\t')
yield rating, 1
def reducer(self, rating, occurences):
yield rating, sum(occurences)
if __name__ == '__main__':
MRRatingCounter.run()
所以请根据上面的讨论纠正我:
如果我错了请纠正我,在这种情况下,key是输入文件的值,我可以这样理解吗:
def mapper ( self{ 这是对象/实例} , key{ 输入文本文件,在这个例子中是文件名,正确的应该是 ml-100k/u.data , line{ 这是我们每次尝试从数据文件传递给mapper()的内容。}
还有一个来自Udemy的代码,我正在学习,正如原问题所问的:
class MRFriendsByAge(MRJob):
def mapper(self, _, line):
(ID, name, age, numFriends) = line.split(',')
yield age, float(numFriends)
def reducer(self, age, numFriends):
total = 0
numElements = 0
for x in numFriends:
total += x
numElements += 1
yield age, total / numElements
如果 name == 'main':
MRFriendsByAge.run()
我对mrjob了解不多,所以我会做一些假设。首先,前面的下划线(_)表示可以忽略这个键(我在谷歌搜索后确认了这一点)。其次,我猜它应该可以处理用逗号分隔的文件列表或者一个文件夹。接下来,这段代码没有设置部分,可能是因为这些都是默认的方法名称。如果你把你的mapper或reducer命名得不一样,mrjob可能就无法自动识别了。
我在这里找到了一些例子。
mapper方法接收的是从输入文本中解析出来的键值对。mrjob使用Hadoop流处理,每一行输入文本都是通过换行符分开的,然后根据使用的输入协议将每一行分成键值对。这些都是框架为你处理好的,所以你不需要费心去做复杂的工作;你只需要相信你会得到正确的键和值。
不过,你需要指定输入文本文件的类型。例如,如果键和值不是普通文本(就像原问题中提到的),而是序列化的JSON格式,那么你就要使用JSONProtocol/JSONValueProtocol等,而不是默认的RawValueProtocol。
对于初始的mapper,每一行会通过RawValueProtocol读取到值中,所以你不会收到键。使用_
只是Python中一个约定,用来表示一个未使用的虚拟变量。(不过,_
其实也是一个有效的Python变量名。你可以这样做:a = 3; _ = 2; b = a + _
。这听起来有点奇怪,对吧?)
mrjob可以处理多个输入文件。你可以这样做:
$ python wordcount.py text1.txt text2.txt
如果你想让所有文本文件作为mrjob任务的输入,可以这样做:
$ python wordcount.py inputdir/*.txt
或者简单地这样:
$ python wordcount.py inputdir
这样所有选择的文件都会作为输入。
reducer接收到的是一个键和与这个键相关的所有值的迭代器。所以在reducer方法中,values
这个变量就是一个迭代器。如果你想对所有值做一些操作,你需要实际遍历它们。在原问题中的具体例子里,内置函数sum
可以接受一个迭代器作为参数,这就是为什么你可以一次性完成这个操作。但实际上这和sum([value for value in values])
是类似的。
我其实不知道怎么对mrjob脚本进行单元测试。我通常只是先在一小部分测试数据上进行测试,然后再进行生产运行。