Flink 1.11的java架构演化在MapState POJO类作为值时失败
我从Flink Docs开始测试下面的东西
Fields can be removed. Once removed, the previous value for the removed field will be dropped in future checkpoints and savepoints. POJO Class
我遵循的步骤:
- 通过以下给定的rules定义POJO类 e、 g
@Data
@NoArgsConstructor
public class SomePojo {
@NonNull String field1;
@NonNull String field2;
@NonNull String field3;
public static TypeInformation<SomePojo > getProducedType() {
return TypeInformation.of(SomePojo .class);
}
}
- 将窗口运算符和其中的MapState定义为
private transient MapState<String, SomePojo> mapState; // declared class level field
MapStateDescriptor<String, SomePojo> mapStateDescriptor =
new MapStateDescriptor<>("state",Types.String, SomePojo.getProducedType()); // defined descriptor
- 在KDA中部署了应用程序,现在通过某个键将SomePojo对象的值存储在MapState中
mapstate.put("someString", objectOfSomePojoClass);
- 更改SomePojo以删除第三个字段
@Data
@NoArgsConstructor
public class SomePojo {
@NonNull String field1;
@NonNull String field2;
// @NonNull String field3; Commented out 3rd field to check schema evolution
public static TypeInformation<SomePojo > getProducedType() {
return TypeInformation.of(SomePojo .class);
}
}
- 重复第三步。但当flink开始抛出异常时,如下所示
2021-06-10 19:22:40
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:204)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:247)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:474)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:470)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:529)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:724)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:549)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_8a1a38027b51a601bd831c52e46e25fa_(1/2) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception.
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:329)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:535)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.lang.NullPointerException
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>(PojoSerializer.java:123)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:56)
at org.apache.flink.api.common.typeutils.CompositeSerializer$PrecomputedParameters.precompute(CompositeSerializer.java:228)
at org.apache.flink.api.common.typeutils.CompositeSerializer.<init>(CompositeSerializer.java:51)
at org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer.<init>(TtlStateFactory.java:250)
at org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializerSnapshot.createOuterSerializerWithNestedSerializers(TtlStateFactory.java:359)
at org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializerSnapshot.createOuterSerializerWithNestedSerializers(TtlStateFactory.java:330)
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:550)
at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:517)
at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.snapshotsToRestoreSerializers(NestedSerializersSnapshotDelegate.java:225)
at org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate.getRestoredNestedSerializers(NestedSerializersSnapshotDelegate.java:83)
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.restoreSerializer(CompositeTypeSerializerSnapshot.java:204)
at org.apache.flink.runtime.state.StateSerializerProvider.previousSchemaSerializer(StateSerializerProvider.java:189)
at org.apache.flink.runtime.state.StateSerializerProvider.currentSchemaSerializer(StateSerializerProvider.java:164)
at org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.getStateSerializer(RegisteredKeyValueStateBackendMetaInfo.java:137)
at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(RocksDbTtlCompactFiltersManager.java:71)
at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createColumnFamilyDescriptor(RocksDBOperationUtils.java:153)
at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.createStateInfo(RocksDBOperationUtils.java:136)
at org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.getOrRegisterStateColumnFamilyHandle(AbstractRocksDBRestoreOperation.java:161)
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:191)
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:167)
at org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
... 15 more
标注
- 我已经测试了添加新字段的效果,并设置了默认值李>
- 我使用的运行时是Flink-1_11
- 我已经看到模式演化已经在这里为mapstate修复了https://issues.apache.org/jira/browse/FLINK-11947
共 (0) 个答案