有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

使用Spark流作业处理HDFS文件:java

我试图在HDFS目录下处理JSON文件(为了测试,我使用本地windows目录)

问题是这些文件没有被使用,新文件也没有被识别。下面是我试图执行的代码

    public class SparkConsumer {

    public static void main(String[] args) {
        try {
            execute();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public static void execute() throws InterruptedException {
        Function0<JavaStreamingContext> createContextFunc = new Function0<JavaStreamingContext>() {

            private static final long serialVersionUID = 1L;

            @Override
            public JavaStreamingContext call() {
                return createContext();
            }
        };

        JavaStreamingContext streamingContext = JavaStreamingContext.getOrCreate("hdfs://quickstart.cloudera:8020/user/data/cdx_checkpoint", createContextFunc);

        // TODO write logic to read from HDFS and Merge JSON's.

        JavaPairInputDStream<LongWritable,Text> hdfsDstream = streamingContext
                .fileStream("hdfs://quickstart.cloudera:8020/user/data", LongWritable.class , Text.class, TextInputFormat.class);
        hdfsDstream.foreachRDD(new VoidFunction<JavaPairRDD<LongWritable,Text>>(){

            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            @Override
            public void call(JavaPairRDD<LongWritable, Text> arg0)
                    throws Exception {
                System.out.println(arg0.countByValue().size());
                arg0.checkpoint();

            }


        });

        streamingContext.start();
        streamingContext.awaitTermination();

    }

    private static JavaStreamingContext createContext() {
        final SparkConf conf = new SparkConf().setAppName("Kafka-Spark-Streaming-Application").setMaster("local[*]");
        conf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
        JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
        ssc.checkpoint("hdfs://quickstart.cloudera:8020/user/data/cdx_checkpoint");

        return ssc;
    }

}

我的问题归结为如何使用目录下的文件,同时检查最新的目录并处理它们

示例json文件作为纯文本文件复制到HDFS目录。但我看不到任何与访问文件有关的日志


共 (0) 个答案