有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

Apache Beam代码中的java语法错误插入

我正在编写用于测试的Apache beam代码。请参考下面的代码。我创建了示例SimpleFunction,并应用代码并尝试编译

    public static void main(String[] args) throws IOException {
        System.out.println("Test Log");

        PipelineOptions options = PipelineOptionsFactory.create();
        //options.setRunner(SparkRunner.class);
        options.setRunner(SparkRunner.class);

        Pipeline p = Pipeline.create(options);

        p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
            // withCompression can be omitted - by default compression is detected from the filename.
            .apply(FileIO.readMatches())
            .apply(MapElements
                // uses imports from TypeDescriptors
                .via(
                    new SimpleFunction <ReadableFile, KV<String,String>>() {

                        private static final long serialVersionUID = -7867677L;

                        @SuppressWarnings("unused")
                        public KV<String,String> createKV(ReadableFile f) {
                            String temp = null;
                            try{
                            temp = f.readFullyAsUTF8String();
                            }catch(IOException e){

                            }
                            return KV.of(f.getMetadata().resourceId().toString(), temp);
                        }

                        @Override
                        public String apply(ReadableFile element, KV<String, String> input) {
                            StringBuilder str = new StringBuilder();
                            return str.toString();
                        }
                    }

            ))
            .apply(FileIO.write());
        p.run();
    }

但编译器正在抛出syntax error at public String apply(ReadableFile

我试图解决这个问题,但没有成功,有人能指导我吗


共 (1) 个答案

  1. # 1 楼答案

    SimpleFunction<InputT, OutputT>InputT的值并返回OutputT的值。本例中apply的签名为OutputT apply(InputT input);,请参见here

    对于您的类型,SimpleFunction必须如下所示:

    new SimpleFunction <ReadableFile, KV<String,String>>() {
       ...
       @Override
       public KV<String,String> apply(ReadableFile input) {
          ...
       }
    }
    

    例如,看看它是如何使用的here

    在您的例子中,您需要更多关于readMatches()的逻辑,请参见here以了解如何将其应用于解析Avros,而this是该代码中PTransform的实现细节