有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

CustomArrayWritable类MapReduce的toString()方法中的java NullPointerException

我试图使用Time_Ant10s(自定义ArrayWritable类)作为减速机的输出

我提到了这个很好的问题:MapReduce Output ArrayWritable,但是我在减缩器的最后一行中得到了context.write()中的NullPointerException

我想Time_Ant10s.toString()中的get()可能返回null,但我不知道为什么会发生这种情况。你能帮我吗

主要方法

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "something");

    // general
    job.setJarByClass(CommutingTime1.class);
    job.setMapperClass(Mapper1.class);
    job.setReducerClass(Reducer1.class);
    job.setNumReduceTasks(1);
    job.setInputFormatClass (TextInputFormat.class);

    // mapper output
    job.setMapOutputKeyClass(Date_Uid.class);
    job.setMapOutputValueClass(Time_Ant10.class);

    // reducer output
    job.setOutputFormatClass(CommaTextOutputFormat.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Time_Ant10s.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

映射器

public static class Mapper1 extends Mapper<LongWritable, Text, Date_Uid, Time_Ant10> {
    /* map as <date_uid, time_ant10> */
    // omitted
    }
}

减速器

public static class Reducer1 extends Reducer<Date_Uid, Time_Ant10, IntWritable, Time_Ant10s> {
    /* <date_uid, time_ant10> -> <date, time_ant10s> */

    private IntWritable date = new IntWritable();

    @Override
    protected void reduce(Date_Uid date_uid, Iterable<Time_Ant10> time_ant10s, Context context) throws IOException, InterruptedException {

        date.set(date_uid.getDate());

        // count ants
        int num = 0;
        for(Time_Ant10 time_ant10 : time_ant10s){
            num++;
        }

        if(num>=1){
            Time_Ant10[] temp = new Time_Ant10[num];

            int i=0;
            for(Time_Ant10 time_ant10 : time_ant10s){
                String time = time_ant10.getTimeStr();
                int ant10 = time_ant10.getAnt10();
                temp[i] = new Time_Ant10(time, ant10);
                i++;
            }

            context.write(date, new Time_Ant10s(temp));
        }
    }
}

作者

public static class CommaTextOutputFormat extends TextOutputFormat<IntWritable, Time_Ant10s> {
    @Override
    public RecordWriter<IntWritable, Time_Ant10s> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        Configuration conf = job.getConfiguration();
        String extension = ".txt";
        Path file = getDefaultWorkFile(job, extension);
        FileSystem fs = file.getFileSystem(conf);
        FSDataOutputStream fileOut = fs.create(file, false);
        return new LineRecordWriter<IntWritable, Time_Ant10s>(fileOut, ",");
    }
}

自定义可写内容

// Time
public static class Time implements Writable { 
    private int h, m, s;

    public Time() {}

    public Time(int h, int m, int s) {
        this.h = h;
        this.m = m;
        this.s = s;
    }

    public Time(String time) {
        String[] hms = time.split(":", 0);
        this.h = Integer.parseInt(hms[0]);
        this.m = Integer.parseInt(hms[1]);
        this.s = Integer.parseInt(hms[2]);
    }

    public void set(int h, int m, int s) {
        this.h = h;
        this.m = m;
        this.s = s;
    }

    public void set(String time) {
        String[] hms = time.split(":", 0);
        this.h = Integer.parseInt(hms[0]);
        this.m = Integer.parseInt(hms[1]);
        this.s = Integer.parseInt(hms[2]);
    }

    public int[] getTime() {
        int[] time = new int[3];
        time[0] = this.h;
        time[1] = this.m;
        time[2] = this.s;
        return time;
    }

    public String getTimeStr() {
        return String.format("%1$02d:%2$02d:%3$02d", this.h, this.m, this.s);
    }

    public int getTimeInt() {
        return this.h * 10000 + this.m * 100 + this.s;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        h = in.readInt();
        m = in.readInt();
        s = in.readInt();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(h);
        out.writeInt(m);
        out.writeInt(s);
    }
}

// Time_Ant10
public static class Time_Ant10 implements Writable { 
    private Time time;
    private int ant10;

    public Time_Ant10() {
        this.time = new Time();
    }

    public Time_Ant10(Time time, int ant10) {
        this.time = time;
        this.ant10 = ant10;
    }

    public Time_Ant10(String time, int ant10) {
        this.time = new Time(time);
        this.ant10 = ant10;
    }

    public void set(Time time, int ant10) {
        this.time = time;
        this.ant10 = ant10;
    }

    public void set(String time, int ant10) {
        this.time = new Time(time);
        this.ant10 = ant10;
    }

    public int[] getTime() {
        return this.time.getTime();
    }

    public String getTimeStr() {
        return this.time.getTimeStr();
    }

    public int getTimeInt() {
        return this.time.getTimeInt();
    }

    public int getAnt10() {
        return this.ant10;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        time.readFields(in);
        ant10 = in.readInt();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        time.write(out);
        out.writeInt(ant10);
    }
}

// Time_Ant10s
public static class Time_Ant10s extends ArrayWritable {
    public Time_Ant10s(){
        super(Time_Ant10.class);
    }

    public Time_Ant10s(Time_Ant10[] time_ant10s){
        super(Time_Ant10.class, time_ant10s);
    }

    @Override
    public Time_Ant10[] get() {
        return (Time_Ant10[]) super.get();
    }

    @Override
    public String toString() {
        int time, ant10;
        Time_Ant10[] time_ant10s = get();
        String output = "";

        for(Time_Ant10 time_ant10: time_ant10s){
            time = time_ant10.getTimeInt();
            ant10 = time_ant10.getAnt10();
            output += time + "," + ant10 + ",";
        }

        return output;
    }
}   

// Data_Uid
public static class Date_Uid implements WritableComparable<Date_Uid> { 
    // omitted
}

错误消息

java.lang.Exception: java.lang.NullPointerException
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.lang.NullPointerException
    at CommutingTime1$Time_Ant10s.toString(CommutingTime1.java:179)
    at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat$LineRecordWriter.writeObject(TextOutputFormat.java:85)
    at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat$LineRecordWriter.write(TextOutputFormat.java:104)
    at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
    at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
    at CommutingTime1$Reducer1.reduce(CommutingTime1.java:323)
    at CommutingTime1$Reducer1.reduce(CommutingTime1.java:291)
    at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

共 (1) 个答案

  1. # 1 楼答案

    我发现问题是Iterable中的reduce不能重复两次。所以我引用this page并将减速机和Time_Ant10s更改如下。现在一切都很好

    @redflar3:非常感谢你给我的提示。我完全误解了我的代码有缺陷的地方

    减速器

    public static class Reducer1 extends Reducer<Date_Uid, Time_Ant10, IntWritable, Time_Ant10s> {
        private IntWritable date = new IntWritable();
    
        @Override
        protected void reduce(Date_Uid date_uid, Iterable<Time_Ant10> time_ant10s, Context context) throws IOException, InterruptedException {
            String time = "";
            int ant10;
    
            date.set(date_uid.getDate());
    
            ArrayList<Time_Ant10> temp_list = new ArrayList<Time_Ant10>();
            for (Time_Ant10 time_ant10 : time_ant10s){
                time = time_ant10.getTimeStr();
                ant10 = time_ant10.getAnt10();
                temp_list.add(new Time_Ant10(time, ant10));
            }
    
            if(temp_list.size() >= 1){
                Time_Ant10[] temp_array = temp_list.toArray(new Time_Ant10[temp_list.size()]);
                context.write(date, new Time_Ant10s(temp_array));
            }
        }
    }
    

    时间为10秒

    public static class Time_Ant10s extends ArrayWritable {
        public Time_Ant10s(){
            super(Time_Ant10.class);
        }
    
        public Time_Ant10s(Time_Ant10[] time_ant10s){
            super(Time_Ant10.class, time_ant10s);
        }
    
        @Override
        public Time_Ant10[] get() {
            return (Time_Ant10[]) super.get();
        }
    
        @Override
        public String toString() {
            int time, ant10;
            Time_Ant10[] time_ant10s = get();
            String output = "";
    
            for(Time_Ant10 time_ant10: time_ant10s){
                time = time_ant10.getTimeInt();
                ant10 = time_ant10.getAnt10();
                output += time + "," + ant10 + ",";
            }
    
            return output;
        }
    }