合流kafka生产者avro架构错误ClientError:架构分析失败:未知命名架构

2024-10-01 09:36:25 发布

您现在位置:Python中文网/ 问答频道 /正文

我在卡夫卡制片方工作,在主题中推送信息。 我正在使用confluent-kafkaavro producer

Liked issue on github

下面是我的模式.avsc文件

按键。avsc

{
    "namespace": "io.codebrews.schema.test",
    "type": "record",
    "name": "Keys",
    "fields": [
        {
            "name": "name",
            "type": "string"
        },
        {
            "name": "email",
            "type": "string"
        }
    ]
}

Test.avsc

{
    "namespace": "io.codebrews.schema.test",
    "type": "record",
    "name": "Subscription",
    "fields": [
        {
            "name": "test",
            "type": "string"
        },
        {
            "name": "keys",
            "type": "io.codebrews.schema.test.Keys"
        }
    ]
}

Producer.py

key_schema, value_schema = load_avro_schema_from_file('Subscription.avsc')

try:
    producer = avro.AvroProducer(producer_config, default_key_schema=key_schema, default_value_schema=value_schema)
except Exception as e:
    raise e

def load_avro_schema_from_file(schema_file):
    key_schema_string = """
    {"type": "string"}
    """

    key_schema = avro.loads(key_schema_string)
    value_schema = avro.load("./avro/" + schema_file)

    return key_schema, value_schema

当我尝试注册Keys.avsc时,它工作正常,没有错误。但是当我在注册{}之后尝试注册{}时。我得到下面的错误

confluent_kafka.avro.error.ClientError: Schema parse failed: Unknown named schema 'io.codebrews.schema.test.Keys', known names: ['io.codebrews.schema.test.Subscription'].

在手动注册模式之后

{
    "namespace": "io.codebrews.schema.test",
    "type": "record",
    "name": "Subscription",
    "fields": [
        {
            "name": "test",
            "type": "string"
        },
        {
            "name": "keys",
            "type": "Keys"
        }
    ]
}

当在我的主题中推送消息时,我得到以下错误

ClientError: Incompatible Avro schema:409 message:{'error_code': 409, 'message': 'Schema being registered is incompatible with an earlier schema for subject "test-value".

我做错什么了吗

还有谁能帮我停止python中的自动模式注册


Tags: producerkeynameioteststringvalueschema
1条回答
网友
1楼 · 发布于 2024-10-01 09:36:25

错误与解析器有关,而与注册表注册无关

您的AVSC文件需要完全包含所有记录类型。当解析器读取一个文件时,它无法了解其他文件

如果您从一个AVDL文件开始,然后将其转换为AVSC,那么这些记录会正确地嵌入到外部记录中

具体来说,

"fields" :[{
  "name": "keys",
  "type": {
       "type": "record", 
       "namespace": "io.codebrews.schema.test.Keys", 
       ... 
  } 
}

相关问题 更多 >