Python中的groupby和列表推导问题
我从一个Hadoop教程中学到这个。这是一个Reducer,基本上是从标准输入中接收(单词,计数)对,然后把它们加起来。
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 1)
def main(separator='\t'):
data = read_mapper_output(sys.stdin, separator=separator)
for current_word, group in groupby(data, itemgetter(0)):
try:
total_uppercount = sum(int(count) for current_word, count in group)
print "%s%s%d" % (current_word, separator, total_count)
except ValueError:
pass
现在,我想接收(单词,计数1,计数2)这样的元组,但这里的 groupby
和 sum(int(count for current_word, count in group)
让我完全看不懂。我该如何修改这部分代码,让它继续执行现在的功能,但能处理第二个计数值呢?也就是说,输入是(单词,计数1,计数2),输出也是(单词,计数1,计数2)。
编辑 1:
from itertools import groupby, izip
from operator import itemgetter
import sys
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 2)
def main(separator='\t'):
data = read_mapper_output(sys.stdin, separator=separator)
for current_word, group in groupby(data, itemgetter(0)):
try:
counts_a, counts_b = izip((int(count_a), int(count_b)) for current_word, count_a, count_b in group)
t1, t2 = sum(counts_a), sum(counts_b)
print "%s%s%d%s%d" % (current_word, separator, t1, separator, t2)
except ValueError:
pass
这是一个Hadoop作业,所以输出是这样的:
11/11/23 18:44:21 INFO streaming.StreamJob: map 100% reduce 0%
11/11/23 18:44:30 INFO streaming.StreamJob: map 100% reduce 17%
11/11/23 18:44:33 INFO streaming.StreamJob: map 100% reduce 2%
11/11/23 18:44:42 INFO streaming.StreamJob: map 100% reduce 12%
11/11/23 18:44:45 INFO streaming.StreamJob: map 100% reduce 0%
11/11/23 18:44:51 INFO streaming.StreamJob: map 100% reduce 3%
11/11/23 18:44:54 INFO streaming.StreamJob: map 100% reduce 7%
11/11/23 18:44:57 INFO streaming.StreamJob: map 100% reduce 0%
11/11/23 18:45:05 INFO streaming.StreamJob: map 100% reduce 2%
11/11/23 18:45:06 INFO streaming.StreamJob: map 100% reduce 8%
11/11/23 18:45:08 INFO streaming.StreamJob: map 100% reduce 7%
11/11/23 18:45:09 INFO streaming.StreamJob: map 100% reduce 3%
11/11/23 18:45:12 INFO streaming.StreamJob: map 100% reduce 100%
...
11/11/23 18:45:12 ERROR streaming.StreamJob: Job not Successful!
从日志中:
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:311)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:545)
at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:137)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:473)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:411)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:311)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:545)
at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:137)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:473)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:411)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.
2 个回答
from collections import defaultdict
def main(separator='\t'):
data = read_mapper_output(sys.stdin, separator=separator)
counts = defaultdict(lambda: [0, 0])
for word, (count1, count2) in data:
values = counts[word]
values[0] += count1
values[1] += count2
for word, (count1, count2) in counts.iteritems():
print('{0}\t{1}\t{2}'.format(word, count1, count2))
当然可以!请把你想要翻译的内容发给我,我会帮你用简单易懂的语言解释清楚。
groupby
这是来自itertools
模块的groupby
函数,详细信息可以在这里找到。data
是通过对每个元素应用itemgetter(0)
(这是operator
模块中的一个itemgetter
类的实例,详细信息可以在这里找到)进行“分组”的。它返回一对(键结果,包含该键的元素的迭代器)。所以,每次循环时,current_word
就是与一堆data
行共同的“单词”(即通过itemgetter
提取的第一个项目),而group
是一个迭代器,遍历以该word
开头的data
行。根据你代码的文档,每行文件都有两个单词:一个实际的“单词”和一个计数(文本意图被解释为数字)
sum(int(count) for current_word, count in group)
这字面意思就是这样:对每个在group
中找到的(current_word
,count
)对,计算count
的整数值之和。每个group
都是来自data
的行集合,如上所述。所以我们取所有以current_word
开头的行,将它们的字符串count
值转换为整数,然后加起来。
我该如何修改这段代码,使它基本上继续执行现在的功能,但增加一个第二个计数值?也就是说,输入是(单词,计数1,计数2),输出是(单词,计数1,计数2)。
那么,你希望每个计数代表什么?你希望数据来自哪里?
我将采取我认为最简单的解释:你将修改数据文件,使每行有三个项目,并且你将分别对每列的数字进行求和。
groupby
部分将保持不变,因为我们仍然以相同的方式对行进行分组,并且仍然根据“单词”进行分组。
sum
部分需要计算两个值:第一列数字的总和和第二列数字的总和。
当我们遍历group
时,我们将得到三项值的集合,所以我们想将它们拆分为三个值:例如current_word, group_a, group_b
。对于这些,我们希望对每行的两个数字都进行整数转换。这将给我们一系列数字对;如果我们想将所有的第一个数字和所有的第二个数字相加,那么我们应该将它们变成一对数字序列。为此,我们可以使用另一个itertools
函数,叫做izip
。然后我们可以分别对每个序列进行求和,再将它们拆分成两个单独的数字序列变量,并进行求和。
因此:
counts_a, counts_b = izip(
(int(count_a), int(count_b)) for current_word, count_a, count_b in group
)
total_a, total_b = sum(counts_a), sum(counts_b)
或者我们可以通过再次使用同样的(x for y in z)技巧,简单地制作一对计数:
totals = (
sum(counts)
for counts in izip(
(int(count_a), int(count_b)) for current_word, count_a, count_b in group
)
)
尽管这个结果在打印语句中使用起来会有点棘手 :)