JavaApacheBeamforGoogleCloudDataflow404在使用BigQueryIO时出错。写创建配置。如果需要,请创建\u
当我运行Beam数据流(用Java编写)以将数据输入GoogleBigQuery时,我遇到了一个问题。这个问题发生在我的开发环境中,也发生在生产环境中
我正在通过一个单独线程上的Beam管道处理多个数据项。我正在使用ParDo
进行处理,它使用apply
在PCollection
上转换接收到的数据。对于转换后的数据,我尝试将其写入GoogleBigQuery
我正试图使用如下例程将转换后的数据写入BigQuery:
transformedData
.apply("Load fact data",
BigQueryIO.<ValidatedDataRecord>write()
.to(new LoadDataFact.DynamicFactTableDestination(dataType.label))
.withFormatFunction(new LoadDataFact.FactSerializationFn())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
这里最重要的是,我使用了以下内容:
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED
。。。因为如果不存在,我希望创建一个新表
问题是,当新数据成为数据流的一部分,并且该数据需要新的数据集时,我经常(但并非总是)得到以下404错误:
Exception thrown in class : com.myOrg.myPackage.myClass Error : java.lang.RuntimeException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
{
"code" : 404,
"errors" : [ {
"domain" : "global",
"message" : "Not found: Table my-project:my-dataset.my-table",
"reason" : "notFound"
} ],
"message" : "Not found: Table my-project:my-dataset.my-table",
"status" : "NOT_FOUND"
}
我已经调查了底层Beam SDK Java核心库中的情况。当处理第一个新数据项时,“Beam SDKs Java Core”库中的CreateTables.class
应该尝试创建一个新表,如果成功,它将向新表的静态CreateTables.createdTables
集合添加一个条目,以指示该表已创建。404错误似乎是在创建表之后发生的(尽管有时无法完全创建表)。我不确定是什么导致了这种行为,或者是什么触发了404错误(错误消息没有给出太多信息)。BigQuery中的这种错误通常是因为表在被访问的任何阶段都不存在
CreateTables.class
是以下Beam SDK Java核心库的一部分:
C:\Users\my.username\.m2\repository\org\apache\beam\beam-sdks-java-io-google-cloud-platform\2.5.0\beam-sdks-java-io-google-cloud-platform-2.5.0.jar!\org\apache\beam\sdk\io\gcp\bigquery\CreateTables.class
我在光束上提出了一个错误Jira-https://issues.apache.org/jira/browse/BEAM-7195
我尝试将我的Beam SDK Java核心库更新为V2.12.0,但由于某种原因,数据流完全停止工作
我有办法解决这个问题吗?我使用一个自定义类来重写Beam用于处理的DynamicDestinations
类(这是文档指定应该实现动态数据目的地的方式-请参见https://beam.apache.org/documentation/io/built-in/google-bigquery/#using-dynamic-destinations)
我的自定义类如下所示,在处理每个数据项的过程中调用getTable
方法,以确定应将数据添加到的表:
public class LoadDataFact {
public static class DynamicFactTableDestination extends DynamicDestinations<ValidatedDataRecord, String> {
private static final long serialVersionUID = -1234561111111123456L;
private static final String projectID = "my-project";
private String dataType = "none";
private String elementDuration = "unknown";
public DynamicFactTableDestination(String dataType) {
this.dataType = dataType;
}
@Override
public String getDestination(ValueInSingleWindow<ValidatedDataRecord> element) {
return element.getValue().DatasetName;
}
@Override
public TableDestination getTable(String destination) {
try {
return new TableDestination(new TableReference()
.setProjectId(projectID)
.setDatasetId(destination)
.setTableId(String.format("data_for_%s",this.dataType)), "Data staging table",
new TimePartitioning()
.setType("DAY")
.setField("created_date_time"));
}
catch (Exception ex) {
System.out.println("Error " + ex.getMessage());
}
return null;
}
这是我的pom。xml:
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.myOrg.myNamespace</groupId>
<artifactId>my-artifact-name</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<beam.version>2.5.0</beam.version>
<maven-compiler-plugin.version>3.6.2</maven-compiler-plugin.version>
<maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
<slf4j.version>1.7.25</slf4j.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>${maven-exec-plugin.version}</version>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>
org.apache.maven.plugins
</groupId>
<artifactId>
maven-compiler-plugin
</artifactId>
<versionRange>
[@maven-compiler-plugin.version@,)
</versionRange>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore></ignore>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- slf4j API frontend binding with JUL backend -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack-core</artifactId>
<version>0.8.16</version>
</dependency>
</dependencies>
</project>
共 (0) 个答案