有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

maven如何在ApacheBeamJavaSDK中使用GCS中的自定义JDBCJAR文件

我有一个用例,即从GCS读取一个文件,并通过Apache Beam将其写入我们自己的数据仓库产品。我们有一个定制的JDBC驱动程序(.jar)来连接仓库,我正在尝试使用ApacheBeam的JdbcIO来执行ETL和maven pom来管理依赖关系。有人能帮助我理解如何在ApacheBeam中利用这个定制jar文件吗


p.apply(JdbcIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"MYDRIVERCLASS", "DATABASE_URL")
.withUsername("username")
.withPassword("password"))
.withQuery("select id,name from Person")
.withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of()))
.withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
    return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);

共 (2) 个答案

  1. # 1 楼答案

    可以在代码中使用this example code,以及如何使用它

    @Experimental(Experimental.Kind.SOURCE_SINK)
    public class JdbcIO {
      /**
       * Read data from a JDBC datasource.
       *
       * @param  Type of the data to be read.
       */
      public static  Read read() {
        return new AutoValue_JdbcIO_Read.Builder().build();
      }
    
      /**
       * Like {@link #read}, but executes multiple instances of the query substituting each element
       * of a {@link PCollection} as query parameters.
       *
       * @param  Type of the data representing query parameters.
       * @param  Type of the data to be read.
       */
      public static  ReadAll readAll() {
        return new AutoValue_JdbcIO_ReadAll.Builder().build();
      }
    
      /**
       * Write data to a JDBC datasource.
       *
       * @param  Type of the data to be written.
       */
      public static  Write write() {
        return new AutoValue_JdbcIO_Write.Builder().build();
      }
    
      private JdbcIO() {}
    
      /**
       * An interface used by {@link JdbcIO.Read} for converting each row of the {@link ResultSet} into
       * an element of the resulting {@link PCollection}.
       */
      @FunctionalInterface
      public interface RowMapper extends Serializable {
        T mapRow(ResultSet resultSet) throws Exception;
      }
    
      /**
       * A POJO describing a {@link DataSource}, either providing directly a {@link DataSource} or all
       * properties allowing to create a {@link DataSource}.
       */
      @AutoValue
      public abstract static class DataSourceConfiguration implements Serializable {
        @Nullable abstract String getDriverClassName();
        @Nullable abstract String getUrl();
        @Nullable abstract String getUsername();
        @Nullable abstract String getPassword();
        @Nullable abstract String getConnectionProperties();
        @Nullable abstract DataSource getDataSource();
    
        abstract Builder builder();
    
        @AutoValue.Builder
        abstract static class Builder {
          abstract Builder setDriverClassName(String driverClassName);
          abstract Builder setUrl(String url);
          abstract Builder setUsername(String username);
          abstract Builder setPassword(String password);
          abstract Builder setConnectionProperties(String connectionProperties);
          abstract Builder setDataSource(DataSource dataSource);
          abstract DataSourceConfiguration build();
        }
    
        public static DataSourceConfiguration create(DataSource dataSource) {
          checkArgument(dataSource != null, "dataSource can not be null");
          checkArgument(dataSource instanceof Serializable, "dataSource must be Serializable");
          return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
              .setDataSource(dataSource)
              .build();
        }
    
        public static DataSourceConfiguration create(String driverClassName, String url) {
          checkArgument(driverClassName != null, "driverClassName can not be null");
          checkArgument(url != null, "url can not be null");
          return new AutoValue_JdbcIO_DataSourceConfiguration.Builder()
              .setDriverClassName(driverClassName)
              .setUrl(url)
              .build();
        }
    
        public DataSourceConfiguration withUsername(String username) {
          return builder().setUsername(username).build();
        }
    
        public DataSourceConfiguration withPassword(String password) {
          return builder().setPassword(password).build();
        }
    
        /**
    

    如本例所示,您可以构建并运行文件。你可以看到更多documentation

    # Build the project.
    gradle('build')
     
    # Check the generated build files.
    run('ls -lh build/libs/')
     
    # Run the shadow (fat jar) build.
    gradle('runShadow')
     
    # Sample the first 20 results, remember there are no ordering guarantees.
    run('head -n 20 outputs/part-00000-of-*')
    
  2. # 2 楼答案

    要使用其他依赖JAR,只需在运行Beam Java管道时将此类JAR添加到类路径中即可。类路径中的所有JAR都应该由光束跑步者进行分段

    还可以使用thisPipelineOption指定依赖项