我从Hadoop教程中得到这个。它是一个reducer,它基本上从stdin接收(单词,count)对,并对它们求和。在
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 1)
def main(separator='\t'):
data = read_mapper_output(sys.stdin, separator=separator)
for current_word, group in groupby(data, itemgetter(0)):
try:
total_uppercount = sum(int(count) for current_word, count in group)
print "%s%s%d" % (current_word, separator, total_count)
except ValueError:
pass
现在,我希望能够接受元组(word,count1,count2),但是这个groupby
和{
编辑1:
^{pr2}$这是一个Hadoop作业,因此输出如下:
11/11/23 18:44:21 INFO streaming.StreamJob: map 100% reduce 0%
11/11/23 18:44:30 INFO streaming.StreamJob: map 100% reduce 17%
11/11/23 18:44:33 INFO streaming.StreamJob: map 100% reduce 2%
11/11/23 18:44:42 INFO streaming.StreamJob: map 100% reduce 12%
11/11/23 18:44:45 INFO streaming.StreamJob: map 100% reduce 0%
11/11/23 18:44:51 INFO streaming.StreamJob: map 100% reduce 3%
11/11/23 18:44:54 INFO streaming.StreamJob: map 100% reduce 7%
11/11/23 18:44:57 INFO streaming.StreamJob: map 100% reduce 0%
11/11/23 18:45:05 INFO streaming.StreamJob: map 100% reduce 2%
11/11/23 18:45:06 INFO streaming.StreamJob: map 100% reduce 8%
11/11/23 18:45:08 INFO streaming.StreamJob: map 100% reduce 7%
11/11/23 18:45:09 INFO streaming.StreamJob: map 100% reduce 3%
11/11/23 18:45:12 INFO streaming.StreamJob: map 100% reduce 100%
...
11/11/23 18:45:12 ERROR streaming.StreamJob: Job not Successful!
从日志中:
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:311)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:545)
at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:137)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:473)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:411)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:311)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:545)
at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:137)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:473)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:411)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out.
这是来自
itertools
模块的groupby
函数,记录了here。data
是根据对每个元素应用itemgetter(0)
(来自operator
模块的itemgetter
类的实例)对每个元素进行“分组”。它返回对(键结果,具有该键的元素上的迭代器)。因此,每次循环,current_word
都是一堆data
行(index-0,即第一项,由itemgetter
提取的)共有的“单词”,而group
是以该word
开头的data
行上的迭代器。如代码文档中所述,文件的每一行都有两个单词:一个实际的“单词”和一个count(文本被解释为数字)这个的意思正是它所说的:在
group
中找到的每个(current_word
,count
)对的整数值之和。每个group
都是来自data
的一组行,如上所述。因此,我们取所有以current_word
开头的行,将它们的字符串count
值转换为整数,并将它们相加。在好吧,你希望每个计数代表什么,你希望数据从哪里来?在
我将采用我认为最简单的解释:将数据文件修改为每行有三项,并分别从每列数字中取和。在
groupby
将是相同的,因为我们仍然以相同的方式对得到的行进行分组,并且仍然根据“单词”对它们进行分组。在sum
部分需要计算两个值:第一列数字的总和和第二列数字的总和。在当我们迭代
group
时,我们将得到三个值的集合,因此我们希望将它们解压为三个值:current_word, group_a, group_b
例如。对于其中的每一个,我们希望对每行上的两个数字应用整数转换。这给了我们一个数对的序列;如果我们想把所有的第一个数和所有的第二个数相加,那么我们应该生成一对数字序列。为此,我们可以使用另一个itertools
函数,名为izip
。然后,我们可以分别求和这些变量,方法是将它们重新解压成两个独立的数字序列变量,然后求和。在因此:
或者,我们也可以用同样的方法(x代表z中的y)来做一对计数:
^{pr2}$尽管在print语句中使用这个结果会有些困难:)
相关问题 更多 >
编程相关推荐