有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

JavaApacheBeamforGoogleCloudDataflow404在使用BigQueryIO时出错。写创建配置。如果需要,请创建\u

当我运行Beam数据流(用Java编写)以将数据输入GoogleBigQuery时,我遇到了一个问题。这个问题发生在我的开发环境中,也发生在生产环境中

我正在通过一个单独线程上的Beam管道处理多个数据项。我正在使用ParDo进行处理,它使用applyPCollection上转换接收到的数据。对于转换后的数据,我尝试将其写入GoogleBigQuery

我正试图使用如下例程将转换后的数据写入BigQuery:

transformedData
   .apply("Load fact data",
         BigQueryIO.<ValidatedDataRecord>write()
         .to(new LoadDataFact.DynamicFactTableDestination(dataType.label))
         .withFormatFunction(new LoadDataFact.FactSerializationFn())
         .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
         .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

这里最重要的是,我使用了以下内容:

.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED

。。。因为如果不存在,我希望创建一个新表

问题是,当新数据成为数据流的一部分,并且该数据需要新的数据集时,我经常(但并非总是)得到以下404错误:

Exception thrown in class : com.myOrg.myPackage.myClass Error : java.lang.RuntimeException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
{
"code" : 404,
"errors" : [ {
"domain" : "global",
"message" : "Not found: Table my-project:my-dataset.my-table",
"reason" : "notFound"
} ],
"message" : "Not found: Table my-project:my-dataset.my-table",
"status" : "NOT_FOUND"
}

我已经调查了底层Beam SDK Java核心库中的情况。当处理第一个新数据项时,“Beam SDKs Java Core”库中的CreateTables.class应该尝试创建一个新表,如果成功,它将向新表的静态CreateTables.createdTables集合添加一个条目,以指示该表已创建。404错误似乎是在创建表之后发生的(尽管有时无法完全创建表)。我不确定是什么导致了这种行为,或者是什么触发了404错误(错误消息没有给出太多信息)。BigQuery中的这种错误通常是因为表在被访问的任何阶段都不存在

CreateTables.class是以下Beam SDK Java核心库的一部分:

C:\Users\my.username\.m2\repository\org\apache\beam\beam-sdks-java-io-google-cloud-platform\2.5.0\beam-sdks-java-io-google-cloud-platform-2.5.0.jar!\org\apache\beam\sdk\io\gcp\bigquery\CreateTables.class

我在光束上提出了一个错误Jira-https://issues.apache.org/jira/browse/BEAM-7195

我尝试将我的Beam SDK Java核心库更新为V2.12.0,但由于某种原因,数据流完全停止工作

我有办法解决这个问题吗?我使用一个自定义类来重写Beam用于处理的DynamicDestinations类(这是文档指定应该实现动态数据目的地的方式-请参见https://beam.apache.org/documentation/io/built-in/google-bigquery/#using-dynamic-destinations

我的自定义类如下所示,在处理每个数据项的过程中调用getTable方法,以确定应将数据添加到的表:

public class LoadDataFact {

    public static class DynamicFactTableDestination extends DynamicDestinations<ValidatedDataRecord, String> {
        private static final long serialVersionUID = -1234561111111123456L;
        private static final String projectID = "my-project";
        private String dataType = "none";
        private String elementDuration = "unknown";

        public DynamicFactTableDestination(String dataType) {
            this.dataType = dataType;
        }

        @Override
        public String getDestination(ValueInSingleWindow<ValidatedDataRecord> element) {    

            return element.getValue().DatasetName;
        }

        @Override
        public TableDestination getTable(String destination) {

            try {
                return new TableDestination(new TableReference()
                        .setProjectId(projectID)
                        .setDatasetId(destination)
                        .setTableId(String.format("data_for_%s",this.dataType)), "Data staging table",
                        new TimePartitioning()
                                .setType("DAY")
                                .setField("created_date_time"));
            }
            catch (Exception ex) {
                System.out.println("Error " + ex.getMessage());
            }

            return null;
        }

这是我的pom。xml:

<?xml version="1.0" encoding="UTF-8"?>
<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.myOrg.myNamespace</groupId>
  <artifactId>my-artifact-name</artifactId>
  <version>1.0-SNAPSHOT</version>

  <packaging>jar</packaging>

  <properties>
    <beam.version>2.5.0</beam.version>
    <maven-compiler-plugin.version>3.6.2</maven-compiler-plugin.version>
    <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
    <slf4j.version>1.7.25</slf4j.version>
  </properties>

  <repositories>
    <repository>
      <id>apache.snapshots</id>
      <name>Apache Development Snapshot Repository</name>
      <url>https://repository.apache.org/content/repositories/snapshots/</url>
      <releases>
        <enabled>false</enabled>
      </releases>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
  </repositories>

  <build>
   <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>${maven-compiler-plugin.version}</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>

    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>org.codehaus.mojo</groupId>
          <artifactId>exec-maven-plugin</artifactId>
          <version>${maven-exec-plugin.version}</version>
          <configuration>
            <cleanupDaemonThreads>false</cleanupDaemonThreads>
          </configuration>
        </plugin>
        <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
        <plugin>
           <groupId>org.eclipse.m2e</groupId>
           <artifactId>lifecycle-mapping</artifactId>
           <version>1.0.0</version>
           <configuration>
              <lifecycleMappingMetadata>
                 <pluginExecutions>
                    <pluginExecution>
                       <pluginExecutionFilter>
                          <groupId>
                             org.apache.maven.plugins
                          </groupId>
                          <artifactId>
                             maven-compiler-plugin
                          </artifactId>
                          <versionRange>
                             [@maven-compiler-plugin.version@,)
                          </versionRange>
                          <goals>
                             <goal>compile</goal>
                             <goal>testCompile</goal>
                          </goals>
                       </pluginExecutionFilter>
                       <action>
                          <ignore></ignore>
                       </action>
                    </pluginExecution>
                 </pluginExecutions>
              </lifecycleMappingMetadata>
           </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

  <dependencies>

    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>${beam.version}</version>
    </dependency>


     <dependency>
      <groupId>com.google.cloud.dataflow</groupId>
      <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
      <version>2.5.0</version>
     </dependency>

    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
      <version>${beam.version}</version>
    </dependency>


    <!-- slf4j API frontend binding with JUL backend -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>${slf4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-jdk14</artifactId>
      <version>${slf4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.msgpack</groupId>
      <artifactId>msgpack-core</artifactId>
      <version>0.8.16</version>
    </dependency>
  </dependencies>
</project>

共 (0) 个答案