python中Groupby和list理解的头痛

2024-09-28 22:34:00 发布

您现在位置:Python中文网/ 问答频道 /正文

我从Hadoop教程中得到这个。它是一个reducer,它基本上从stdin接收(单词,count)对,并对它们求和。在

def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator='\t'):
    data = read_mapper_output(sys.stdin, separator=separator)
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_uppercount = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except ValueError:
            pass

现在,我希望能够接受元组(word,count1,count2),但是这个groupby和{}业务对我来说是完全难以理解的。我现在怎么用第二个值来修改它呢? 一、 输入是(字,count1,count2),输出是(word,count1,count2)。在

编辑1:

^{pr2}$

这是一个Hadoop作业,因此输出如下:

11/11/23 18:44:21 INFO streaming.StreamJob:  map 100%  reduce 0%
11/11/23 18:44:30 INFO streaming.StreamJob:  map 100%  reduce 17%
11/11/23 18:44:33 INFO streaming.StreamJob:  map 100%  reduce 2%
11/11/23 18:44:42 INFO streaming.StreamJob:  map 100%  reduce 12%
11/11/23 18:44:45 INFO streaming.StreamJob:  map 100%  reduce 0%
11/11/23 18:44:51 INFO streaming.StreamJob:  map 100%  reduce 3%
11/11/23 18:44:54 INFO streaming.StreamJob:  map 100%  reduce 7%
11/11/23 18:44:57 INFO streaming.StreamJob:  map 100%  reduce 0%
11/11/23 18:45:05 INFO streaming.StreamJob:  map 100%  reduce 2%
11/11/23 18:45:06 INFO streaming.StreamJob:  map 100%  reduce 8%
11/11/23 18:45:08 INFO streaming.StreamJob:  map 100%  reduce 7%
11/11/23 18:45:09 INFO streaming.StreamJob:  map 100%  reduce 3%
11/11/23 18:45:12 INFO streaming.StreamJob:  map 100%  reduce 100%
...
11/11/23 18:45:12 ERROR streaming.StreamJob: Job not Successful!

从日志中:

java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:311)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:545)
    at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:137)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:473)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:411)
    at org.apache.hadoop.mapred.Child.main(Child.java:170)

java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:311)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:545)
    at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:137)
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:473)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:411)
    at org.apache.hadoop.mapred.Child.main(Child.java:170)

Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.

Tags: orginfohadoopmapreduceapachejavaat
2条回答
from collections import defaultdict

def main(separator='\t'):
    data = read_mapper_output(sys.stdin, separator=separator)
    counts = defaultdict(lambda: [0, 0])
    for word, (count1, count2) in data:
        values = counts[word]
        values[0] += count1
        values[1] += count2

    for word, (count1, count2) in counts.iteritems():
        print('{0}\t{1}\t{2}'.format(word, count1, count2))

groupby

这是来自itertools模块的groupby函数,记录了heredata是根据对每个元素应用itemgetter(0)(来自operator模块的itemgetter类的实例)对每个元素进行“分组”。它返回对(键结果,具有该键的元素上的迭代器)。因此,每次循环,current_word都是一堆data行(index-0,即第一项,由itemgetter提取的)共有的“单词”,而group是以该word开头的data行上的迭代器。如代码文档中所述,文件的每一行都有两个单词:一个实际的“单词”和一个count(文本被解释为数字)

sum(int(count) for current_word, count in group)

这个的意思正是它所说的:在group中找到的每个(current_wordcount)对的整数值之和。每个group都是来自data的一组行,如上所述。因此,我们取所有以current_word开头的行,将它们的字符串count值转换为整数,并将它们相加。在

How do I modify this chunk so it basically continues doing what it does right now, but with a second counter value? I.e. input is (word, count1, count2) and output is (word, count1, count2).

好吧,你希望每个计数代表什么,你希望数据从哪里来?在

我将采用我认为最简单的解释:将数据文件修改为每行有三项,并分别从每列数字中取和。在

groupby将是相同的,因为我们仍然以相同的方式对得到的行进行分组,并且仍然根据“单词”对它们进行分组。在

sum部分需要计算两个值:第一列数字的总和和第二列数字的总和。在

当我们迭代group时,我们将得到三个值的集合,因此我们希望将它们解压为三个值:current_word, group_a, group_b例如。对于其中的每一个,我们希望对每行上的两个数字应用整数转换。这给了我们一个数对的序列;如果我们想把所有的第一个数和所有的第二个数相加,那么我们应该生成一对数字序列。为此,我们可以使用另一个itertools函数,名为izip。然后,我们可以分别求和这些变量,方法是将它们重新解压成两个独立的数字序列变量,然后求和。在

因此:

counts_a, counts_b = izip(
    (int(count_a), int(count_b)) for current_word, count_a, count_b in group
)
total_a, total_b = sum(counts_a), sum(counts_b)

或者,我们也可以用同样的方法(x代表z中的y)来做一对计数:

^{pr2}$

尽管在print语句中使用这个结果会有些困难:)

相关问题 更多 >