java无法通过扩展DataflowPipelineOptions添加CustomPipelineOptions
我希望完成以下工作:
- 自定义用户定义的管道选项,类似于Apache Beam WordCount示例中的WordCountOptions
- 相关javadoc中描述的数据流管道选项
现在,当尝试扩展DataflowPipelineOptions
时:
public interface CustomPipelineOptions extends DataflowPipelineOptions {
@Description("Sample parameter description")
ValueProvider<String> getSampleParameter();
void setSampleParameter(ValueProvider<String> sampleParameter);
// more custom parameters below...
}
并将myCustomPipelineOptions
类型选项传递给mymain()
函数中的run()
:
public static void main(String[] args) {
CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
// set Dataflow specifc options
options.setProject("my-project");
options.setRegion("my-region");
options.setStagingLocation("gs://my-bucket/location");
options.setTempLocation("gs://my-bucket/location");
options.setSubnetwork("regions/my-region/subnetworks/my-subnetwork");
options.setJobName("my-job-name");
options.setUsePublicIps(false);
options.setRunner(DataflowRunner.class);
run(options);
}
(注意,在上面我配置了各种DataflowPipelineOptions
选项,如javadoc中所述)
其中,我使用类型为CustomPipelineOptions
的选项创建管道:
static void run(CustomPipelineOptions options) {
/*
Define pipeline
*/
Pipeline p = Pipeline.create(options);
// function continues below...
}
此外,我在pom.xml
文件中包含了以下相关依赖项:
(注意${beam.version}
是2.31.0,${slf4j.version}
是1.7.25)
<dependencies>
<!-- core beam SDK -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- gcp package -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- dataflowRunner -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<!-- directRunner -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- slf4j; logging for java -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
当我用命令执行管道时:
mvn compile exec:java -Dexec.mainClass=path.to.class.myClass
或与:
mvn compile exec:java -Dexec.mainClass=path.to.class.myClass -Pdataflow-runner
我得到以下错误:
Compilation failure:
[ERROR] /C:/Users/path/to/class/myClass.java:[18,40] package org.apache.beam.runners.dataflow does not exist
[ERROR] /C:/Users/path/to/class/myClass.java:[19,48] package org.apache.beam.runners.dataflow.options does not exist
[ERROR] /C:/Users/path/to/class/myClass.java:[56,52] cannot find symbol
[ERROR] symbol: class DataflowPipelineOptions
[ERROR] location: class path.to.class.myClass
[ERROR] /C:/Users/path/to/class/myClass.java:[82,38] incompatible types: path.to.class.myClass.CustomPipelineOptions cannot be converted to org.apache.beam.sdk.options.PipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[145,67] method as in class org.apache.beam.sdk.options.PipelineOptionsFactory.Builder cannot be applied to given types;
以及:
[ERROR] required: java.lang.Class<T>
[ERROR] found: java.lang.Class<path.to.class.myClass.CustomPipelineOptions>
[ERROR] reason: inference variable T has incompatible bounds
[ERROR] equality constraints: path.to.class.myClass.CustomPipelineOptions
[ERROR] upper bounds: org.apache.beam.sdk.options.PipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[150,16] cannot find symbol
[ERROR] symbol: method setProject(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[151,16] cannot find symbol
[ERROR] symbol: method setRegion(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[152,16] cannot find symbol
[ERROR] symbol: method setStagingLocation(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[153,16] cannot find symbol
[ERROR] symbol: method setTempLocation(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[154,16] cannot find symbol
[ERROR] symbol: method setSubnetwork(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[155,16] cannot find symbol
[ERROR] symbol: method setJobName(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[156,16] cannot find symbol
[ERROR] symbol: method setUsePublicIps(boolean)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[157,27] cannot find symbol
[ERROR] symbol: class DataflowRunner
[ERROR] location: class path.to.class.myClass
对于我为什么会出现这些错误以及如何实现在CustomPipelineOptions
中定义的我自己的管道选项和特定于数据流的DataflowPipelineOptions
中包含这两个选项的目标,我将不胜感激。谢谢
# 1 楼答案
第一个问题在
pom.xml
文件中,我们需要更改到
正如maven文档中的dependency scope所述,将作用域设置为
runtime
将表明依赖关系“不是编译所必需的,而是执行所必需的”我们在编译过程中需要这个依赖项,因此省略依赖项作用域将导致默认作用域compile
第二个问题是在main()中定义
options
时:解决办法是摆脱
withValidation()
:PipelineOptionsValidator的
validate()
方法“验证传递的PipelineOptions
是否符合传递接口的所有验证条件。”由于我们的选项是在验证后分配的,因此管道失败