有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java使用特定Id将架构添加到架构注册表

我们已经在KafkaStreams中使用Confluent Schema Registry一年多了,一切都很好;直到昨天

在UAT环境中,我们似乎删除了一个模式主题,并且我们的一个应用程序开始使用消息进行故障转移

[ERROR] LogAndFailExceptionHandler - Exception caught during Deserialization, taskId: 0_13, topic: TOPIC_NAME, partition: 13, offset: 0 org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 1531

我检查了模式注册表,发现主题丢失,并使用curl查询错误中列出的id 1531,例如:

curl -X GET http://SchemaRegistryHost:8081/schemas/ids/1531

然后回来了:

{"error_code":40403,"message":"Schema not found"}

我天真地尝试重新注册模式,但没有考虑它,它工作了,但模式注册的id与之前的1531 id不一样

我需要将模式注册到ID1531,因为主题中的现有消息已经在魔法字节中包含ID1531

我在https://docs.confluent.io/current/schema-registry/docs/develop/api.html检查了API文档,但没有看到任何关于为模式设置给定Id的内容

是否仍然需要使用schema registry将架构强制为特定Id

我知道有一些备份解决方案,但我现在正在寻找一种修复方法,希望能够防止数据丢失,或者采取非常措施来修复主题数据


共 (1) 个答案

  1. # 1 楼答案

    Is there anyway to force a schema to a specific Id with schema registry?

    没有


    1531的ID实际上并没有“消失”,顺便说一句,它只是被标记为在注册表中被删除(使用_schemas主题查看它)


    据我所知,当你使用卡夫卡夫罗德列泽时,真的没有办法避免这个错误。您必须使用ByteArraydSerializer,然后使用Schema注册表客户端“修复”或“查找”正确的ID,然后反序列化消息的其余部分

    另一个选项是重置消费者组,以便完全跳过这些消息,或者设置异常处理Handling bad messages using Kafka's Streams API