有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java无法通过扩展DataflowPipelineOptions添加CustomPipelineOptions

我希望完成以下工作:

  • 自定义用户定义的管道选项,类似于Apache Beam WordCount示例中的WordCountOptions
  • 相关javadoc中描述的数据流管道选项

现在,当尝试扩展DataflowPipelineOptions时:

public interface CustomPipelineOptions extends DataflowPipelineOptions {
    @Description("Sample parameter description")
    ValueProvider<String> getSampleParameter();
    void setSampleParameter(ValueProvider<String> sampleParameter);
    
    // more custom parameters below...
}

并将myCustomPipelineOptions类型选项传递给mymain()函数中的run()

public static void main(String[] args) {

    CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
    
    // set Dataflow specifc options
    options.setProject("my-project");
    options.setRegion("my-region");
    options.setStagingLocation("gs://my-bucket/location");
    options.setTempLocation("gs://my-bucket/location");
    options.setSubnetwork("regions/my-region/subnetworks/my-subnetwork");
    options.setJobName("my-job-name");
    options.setUsePublicIps(false);
    options.setRunner(DataflowRunner.class);

    run(options);
}

(注意,在上面我配置了各种DataflowPipelineOptions选项,如javadoc中所述)

其中,我使用类型为CustomPipelineOptions的选项创建管道:

static void run(CustomPipelineOptions options) {
    /*
     Define pipeline
     */
    Pipeline p = Pipeline.create(options);
    
    // function continues below...
}

此外,我在pom.xml文件中包含了以下相关依赖项: (注意${beam.version}是2.31.0,${slf4j.version}是1.7.25)

<dependencies>
    <!-- core beam SDK -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>${beam.version}</version>
    </dependency>

    <!-- gcp package -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
      <version>${beam.version}</version>
    </dependency>

    <!-- dataflowRunner -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
      <version>${beam.version}</version>
      <scope>runtime</scope>
    </dependency>

    <!-- directRunner -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-direct-java</artifactId>
      <version>${beam.version}</version>
    </dependency>

    <!-- slf4j; logging for java -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>${slf4j.version}</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <version>${slf4j.version}</version>
    </dependency>
  </dependencies>

当我用命令执行管道时:

mvn compile exec:java -Dexec.mainClass=path.to.class.myClass

或与:

mvn compile exec:java -Dexec.mainClass=path.to.class.myClass -Pdataflow-runner

我得到以下错误:

Compilation failure:
[ERROR] /C:/Users/path/to/class/myClass.java:[18,40] package org.apache.beam.runners.dataflow does not exist
[ERROR] /C:/Users/path/to/class/myClass.java:[19,48] package org.apache.beam.runners.dataflow.options does not exist
[ERROR] /C:/Users/path/to/class/myClass.java:[56,52] cannot find symbol
[ERROR]   symbol:   class DataflowPipelineOptions
[ERROR]   location: class path.to.class.myClass
[ERROR] /C:/Users/path/to/class/myClass.java:[82,38] incompatible types: path.to.class.myClass.CustomPipelineOptions cannot be converted to org.apache.beam.sdk.options.PipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[145,67] method as in class org.apache.beam.sdk.options.PipelineOptionsFactory.Builder cannot be applied to given types;

以及:

[ERROR]   required: java.lang.Class<T>
[ERROR]   found: java.lang.Class<path.to.class.myClass.CustomPipelineOptions>
[ERROR]   reason: inference variable T has incompatible bounds
[ERROR]     equality constraints: path.to.class.myClass.CustomPipelineOptions
[ERROR]     upper bounds: org.apache.beam.sdk.options.PipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[150,16] cannot find symbol
[ERROR]   symbol:   method setProject(java.lang.String)
[ERROR]   location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[151,16] cannot find symbol
[ERROR]   symbol:   method setRegion(java.lang.String)
[ERROR]   location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[152,16] cannot find symbol
[ERROR]   symbol:   method setStagingLocation(java.lang.String)
[ERROR]   location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[153,16] cannot find symbol
[ERROR]   symbol:   method setTempLocation(java.lang.String)
[ERROR]   location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[154,16] cannot find symbol
[ERROR]   symbol:   method setSubnetwork(java.lang.String)
[ERROR]   location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[155,16] cannot find symbol
[ERROR]   symbol:   method setJobName(java.lang.String)
[ERROR]   location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[156,16] cannot find symbol
[ERROR]   symbol:   method setUsePublicIps(boolean)
[ERROR]   location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[157,27] cannot find symbol
[ERROR]   symbol:   class DataflowRunner
[ERROR]   location: class path.to.class.myClass

对于我为什么会出现这些错误以及如何实现在CustomPipelineOptions中定义的我自己的管道选项和特定于数据流的DataflowPipelineOptions中包含这两个选项的目标,我将不胜感激。谢谢


共 (1) 个答案

  1. # 1 楼答案

    第一个问题在pom.xml文件中,我们需要更改

    <!  dataflowRunner  >
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope> <!  delete this!  >
        </dependency>
    

    <!  dataflowRunner  >
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
          <version>${beam.version}</version>
        </dependency>
    

    正如maven文档中的dependency scope所述,将作用域设置为runtime将表明依赖关系“不是编译所必需的,而是执行所必需的”我们在编译过程中需要这个依赖项,因此省略依赖项作用域将导致默认作用域compile

    第二个问题是在main()中定义options时:

    public static void main(String[] args) {
    
        CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
    

    解决办法是摆脱withValidation()

    public static void main(String[] args) {
    
        CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(CustomPipelineOptions.class);
    

    PipelineOptionsValidatorvalidate()方法“验证传递的PipelineOptions是否符合传递接口的所有验证条件。”由于我们的选项是在验证后分配的,因此管道失败