使用Spark流作业处理HDFS文件:java
我试图在HDFS目录下处理JSON文件(为了测试,我使用本地windows目录)
问题是这些文件没有被使用,新文件也没有被识别。下面是我试图执行的代码
public class SparkConsumer {
public static void main(String[] args) {
try {
execute();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void execute() throws InterruptedException {
Function0<JavaStreamingContext> createContextFunc = new Function0<JavaStreamingContext>() {
private static final long serialVersionUID = 1L;
@Override
public JavaStreamingContext call() {
return createContext();
}
};
JavaStreamingContext streamingContext = JavaStreamingContext.getOrCreate("hdfs://quickstart.cloudera:8020/user/data/cdx_checkpoint", createContextFunc);
// TODO write logic to read from HDFS and Merge JSON's.
JavaPairInputDStream<LongWritable,Text> hdfsDstream = streamingContext
.fileStream("hdfs://quickstart.cloudera:8020/user/data", LongWritable.class , Text.class, TextInputFormat.class);
hdfsDstream.foreachRDD(new VoidFunction<JavaPairRDD<LongWritable,Text>>(){
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void call(JavaPairRDD<LongWritable, Text> arg0)
throws Exception {
System.out.println(arg0.countByValue().size());
arg0.checkpoint();
}
});
streamingContext.start();
streamingContext.awaitTermination();
}
private static JavaStreamingContext createContext() {
final SparkConf conf = new SparkConf().setAppName("Kafka-Spark-Streaming-Application").setMaster("local[*]");
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true");
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
ssc.checkpoint("hdfs://quickstart.cloudera:8020/user/data/cdx_checkpoint");
return ssc;
}
}
我的问题归结为如何使用目录下的文件,同时检查最新的目录并处理它们
示例json文件作为纯文本文件复制到HDFS目录。但我看不到任何与访问文件有关的日志
共 (0) 个答案