Apache Beam代码中的java语法错误插入
我正在编写用于测试的Apache beam代码。请参考下面的代码。我创建了示例SimpleFunction,并应用代码并尝试编译
public static void main(String[] args) throws IOException {
System.out.println("Test Log");
PipelineOptions options = PipelineOptionsFactory.create();
//options.setRunner(SparkRunner.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
// withCompression can be omitted - by default compression is detected from the filename.
.apply(FileIO.readMatches())
.apply(MapElements
// uses imports from TypeDescriptors
.via(
new SimpleFunction <ReadableFile, KV<String,String>>() {
private static final long serialVersionUID = -7867677L;
@SuppressWarnings("unused")
public KV<String,String> createKV(ReadableFile f) {
String temp = null;
try{
temp = f.readFullyAsUTF8String();
}catch(IOException e){
}
return KV.of(f.getMetadata().resourceId().toString(), temp);
}
@Override
public String apply(ReadableFile element, KV<String, String> input) {
StringBuilder str = new StringBuilder();
return str.toString();
}
}
))
.apply(FileIO.write());
p.run();
}
但编译器正在抛出syntax error at public String apply(ReadableFile
我试图解决这个问题,但没有成功,有人能指导我吗
# 1 楼答案
SimpleFunction<InputT, OutputT>
取InputT
的值并返回OutputT
的值。本例中apply
的签名为OutputT apply(InputT input);
,请参见here对于您的类型,
SimpleFunction
必须如下所示:例如,看看它是如何使用的here
在您的例子中,您需要更多关于
readMatches()
的逻辑,请参见here以了解如何将其应用于解析Avros,而this是该代码中PTransform
的实现细节