有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何在Mapreduce程序中两次遍历文本值的迭代器?

在我的MapReduce程序中,我有一个reducer函数,它计算文本值迭代器中的项数,然后为迭代器中的每个项输出项作为键,计数作为值。因此,我需要使用迭代器两次。但是一旦迭代器到达末尾,我就无法从第一个开始进行迭代。我如何解决这个问题? 我为我的reduce函数尝试了以下代码:

   public static class ReduceA extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{

        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text>output, Reporter reporter) throws IOException 
        {
            Text t;
            int count =0;                
            String[] attr = key.toString().split(",");      

           while(values.hasNext())               
            {
                values.next();
                count++;

            }

           //Maybe i need to reset my iterator here and start from the beginning but how do i do it?

           String v=Integer.toString(count);
           while(values.hasNext())               
            {
                t=values.next();

                output.collect(t,new Text(v));
            }
        }  
      }

上面的代码生成了空结果。我曾尝试在列表中插入迭代器的值,但由于我需要处理大量GBs的数据,因此使用该列表会导致java堆空间错误。请帮助我修改代码,以便我可以遍历迭代器两次


共 (1) 个答案

  1. # 1 楼答案

    您总是可以用简单的方法来完成:声明一个列表并在第一次迭代时缓存该值。因此,您可以遍历列表并写出输出。你应该有类似的东西:

    public static class ReduceA extends MapReduceBase implements
        Reducer<Text, Text, Text, Text> {
    
    public void reduce(Text key, Iterator<Text> values,
            OutputCollector<Text, Text> output, Reporter reporter)
            throws IOException {
        Text t;
        int count = 0;
        String[] attr = key.toString().split(",");
        List<Text> cache = new ArrayList<Text>();
    
        while (values.hasNext()) {
            cache.add(values.next());
            count++;
    
        }
    
        // Maybe i need to reset my iterator here and start from the beginning
        // but how do i do it?
    
        String v = Integer.toString(count);
        for (Text text : cache) {
            output.collect(text, new Text(v));
        }
    }
    }