java使用特定Id将架构添加到架构注册表
我们已经在KafkaStreams中使用Confluent Schema Registry一年多了,一切都很好;直到昨天
在UAT环境中,我们似乎删除了一个模式主题,并且我们的一个应用程序开始使用消息进行故障转移
[ERROR] LogAndFailExceptionHandler - Exception caught during Deserialization, taskId: 0_13, topic: TOPIC_NAME, partition: 13, offset: 0 org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 1531
我检查了模式注册表,发现主题丢失,并使用curl查询错误中列出的id 1531,例如:
curl -X GET http://SchemaRegistryHost:8081/schemas/ids/1531
然后回来了:
{"error_code":40403,"message":"Schema not found"}
我天真地尝试重新注册模式,但没有考虑它,它工作了,但模式注册的id与之前的1531 id不一样
我需要将模式注册到ID1531,因为主题中的现有消息已经在魔法字节中包含ID1531
我在https://docs.confluent.io/current/schema-registry/docs/develop/api.html检查了API文档,但没有看到任何关于为模式设置给定Id的内容
是否仍然需要使用schema registry将架构强制为特定Id
我知道有一些备份解决方案,但我现在正在寻找一种修复方法,希望能够防止数据丢失,或者采取非常措施来修复主题数据
# 1 楼答案
没有
1531的ID实际上并没有“消失”,顺便说一句,它只是被标记为在注册表中被删除(使用
_schemas
主题查看它)据我所知,当你使用卡夫卡夫罗德列泽时,真的没有办法避免这个错误。您必须使用ByteArraydSerializer,然后使用Schema注册表客户端“修复”或“查找”正确的ID,然后反序列化消息的其余部分
另一个选项是重置消费者组,以便完全跳过这些消息,或者设置异常处理Handling bad messages using Kafka's Streams API