有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

Flink 1.11的java架构演化在MapState POJO类作为值时失败

我从Flink Docs开始测试下面的东西

Fields can be removed. Once removed, the previous value for the removed field will be dropped in future checkpoints and savepoints. POJO Class

我遵循的步骤:

  1. 通过以下给定的rules定义POJO类 e、 g
@Data
@NoArgsConstructor
public class SomePojo {
    @NonNull String field1;
    @NonNull String field2;
    @NonNull String field3;
    public static TypeInformation<SomePojo > getProducedType() {
        return TypeInformation.of(SomePojo .class);
    }
}
  1. 将窗口运算符和其中的MapState定义为
private transient MapState<String, SomePojo> mapState;  // declared class level field

MapStateDescriptor<String, SomePojo> mapStateDescriptor = 
          new MapStateDescriptor<>("state",Types.String, SomePojo.getProducedType());     // defined descriptor 
  1. 在KDA中部署了应用程序,现在通过某个键将SomePojo对象的值存储在MapState中
mapstate.put("someString", objectOfSomePojoClass);
  1. 更改SomePojo以删除第三个字段
@Data
@NoArgsConstructor
public class SomePojo {
    @NonNull String field1;
    @NonNull String field2;
    // @NonNull String field3;        Commented out 3rd field to check schema evolution
    public static TypeInformation<SomePojo > getProducedType() {
        return TypeInformation.of(SomePojo .class);
    }
}
  1. 重复第三步。但当flink开始抛出异常时,如下所示
2021-06-10 19:22:40
java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:474)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:470)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:529)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:724)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:549)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_8a1a38027b51a601bd831c52e46e25fa_(1/2) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
    ... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:329)
    at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
    ... 11 more
Caused by: java.lang.NullPointerException
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>(PojoSerializer.java:123)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186)
    at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:56)
    at org.apache.flink.api.common.typeutils.CompositeSerializer$PrecomputedParameters.precompute(CompositeSerializer.java:228)
    at org.apache.flink.api.common.typeutils.CompositeSerializer.<init>(CompositeSerializer.java:51)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer.<init>(TtlStateFactory.java:250)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializerSnapshot.createOuterSerializerWithNestedSerializers(TtlStateFactory.java:359)
    at org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializerSnapshot.createOuterSerializerWithNestedSerializers(TtlStateFactory.java:330)
    at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
    at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550)
    at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
    at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517)
    at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
    at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83)
    at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204)
    at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189)
    at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164)
    at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.getStateSerializer(RegisteredKeyValueStateBackendMetaInfo.java:137)
    at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:71)
    at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:153)
    at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:136)
    at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.getOrRegisterStateColumnFamilyHandle(AbstractRocksDBRestoreOperation.java:161)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:191)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:167)
    at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
    at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
    ... 15 more

标注


共 (0) 个答案