Python中的groupby和列表推导问题

0 投票
2 回答
1257 浏览
提问于 2025-04-17 06:53

我从一个Hadoop教程中学到这个。这是一个Reducer,基本上是从标准输入中接收(单词,计数)对,然后把它们加起来。

def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator='\t'):
    data = read_mapper_output(sys.stdin, separator=separator)
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_uppercount = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except ValueError:
            pass

现在,我想接收(单词,计数1,计数2)这样的元组,但这里的 groupbysum(int(count for current_word, count in group) 让我完全看不懂。我该如何修改这部分代码,让它继续执行现在的功能,但能处理第二个计数值呢?也就是说,输入是(单词,计数1,计数2),输出也是(单词,计数1,计数2)。

编辑 1:

from itertools import groupby, izip
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 2)

def main(separator='\t'):
    data = read_mapper_output(sys.stdin, separator=separator)
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            counts_a, counts_b = izip((int(count_a), int(count_b)) for current_word, count_a, count_b in group)
            t1, t2 = sum(counts_a), sum(counts_b)
            print "%s%s%d%s%d" % (current_word, separator, t1, separator, t2)
        except ValueError:
            pass

这是一个Hadoop作业,所以输出是这样的:

11/11/23 18:44:21 INFO streaming.StreamJob:  map 100%  reduce 0%
11/11/23 18:44:30 INFO streaming.StreamJob:  map 100%  reduce 17%
11/11/23 18:44:33 INFO streaming.StreamJob:  map 100%  reduce 2%
11/11/23 18:44:42 INFO streaming.StreamJob:  map 100%  reduce 12%
11/11/23 18:44:45 INFO streaming.StreamJob:  map 100%  reduce 0%
11/11/23 18:44:51 INFO streaming.StreamJob:  map 100%  reduce 3%
11/11/23 18:44:54 INFO streaming.StreamJob:  map 100%  reduce 7%
11/11/23 18:44:57 INFO streaming.StreamJob:  map 100%  reduce 0%
11/11/23 18:45:05 INFO streaming.StreamJob:  map 100%  reduce 2%
11/11/23 18:45:06 INFO streaming.StreamJob:  map 100%  reduce 8%
11/11/23 18:45:08 INFO streaming.StreamJob:  map 100%  reduce 7%
11/11/23 18:45:09 INFO streaming.StreamJob:  map 100%  reduce 3%
11/11/23 18:45:12 INFO streaming.StreamJob:  map 100%  reduce 100%
...
11/11/23 18:45:12 ERROR streaming.StreamJob: Job not Successful!

从日志中:

java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:311)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:545)
    at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:137)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:473)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:411)
    at org.apache.hadoop.mapred.Child.main(Child.java:170)

java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:311)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:545)
    at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:137)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:473)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:411)
    at org.apache.hadoop.mapred.Child.main(Child.java:170)

Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.

2 个回答

0
from collections import defaultdict

def main(separator='\t'):
    data = read_mapper_output(sys.stdin, separator=separator)
    counts = defaultdict(lambda: [0, 0])
    for word, (count1, count2) in data:
        values = counts[word]
        values[0] += count1
        values[1] += count2

    for word, (count1, count2) in counts.iteritems():
        print('{0}\t{1}\t{2}'.format(word, count1, count2))

当然可以!请把你想要翻译的内容发给我,我会帮你用简单易懂的语言解释清楚。

3

groupby

这是来自itertools模块的groupby函数,详细信息可以在这里找到。data是通过对每个元素应用itemgetter(0)(这是operator模块中的一个itemgetter类的实例,详细信息可以在这里找到)进行“分组”的。它返回一对(键结果,包含该键的元素的迭代器)。所以,每次循环时,current_word就是与一堆data行共同的“单词”(即通过itemgetter提取的第一个项目),而group是一个迭代器,遍历以该word开头的data行。根据你代码的文档,每行文件都有两个单词:一个实际的“单词”和一个计数(文本意图被解释为数字)

sum(int(count) for current_word, count in group)

字面意思就是这样:对每个在group中找到的(current_wordcount)对,计算count的整数值之和。每个group都是来自data的行集合,如上所述。所以我们取所有以current_word开头的行,将它们的字符串count值转换为整数,然后加起来。

我该如何修改这段代码,使它基本上继续执行现在的功能,但增加一个第二个计数值?也就是说,输入是(单词,计数1,计数2),输出是(单词,计数1,计数2)。

那么,你希望每个计数代表什么?你希望数据来自哪里?

我将采取我认为最简单的解释:你将修改数据文件,使每行有三个项目,并且你将分别对每列的数字进行求和。

groupby部分将保持不变,因为我们仍然以相同的方式对行进行分组,并且仍然根据“单词”进行分组。

sum部分需要计算两个值:第一列数字的总和和第二列数字的总和。

当我们遍历group时,我们将得到三项值的集合,所以我们想将它们拆分为三个值:例如current_word, group_a, group_b。对于这些,我们希望对每行的两个数字都进行整数转换。这将给我们一系列数字对;如果我们想将所有的第一个数字和所有的第二个数字相加,那么我们应该将它们变成一对数字序列。为此,我们可以使用另一个itertools函数,叫做izip。然后我们可以分别对每个序列进行求和,再将它们拆分成两个单独的数字序列变量,并进行求和。

因此:

counts_a, counts_b = izip(
    (int(count_a), int(count_b)) for current_word, count_a, count_b in group
)
total_a, total_b = sum(counts_a), sum(counts_b)

或者我们可以通过再次使用同样的(x for y in z)技巧,简单地制作一对计数:

totals = (
    sum(counts)
    for counts in izip(
        (int(count_a), int(count_b)) for current_word, count_a, count_b in group
    )
)

尽管这个结果在打印语句中使用起来会有点棘手 :)

撰写回答