在python上使用kafka和模式注册表管理和加载avro模式文件的最佳方法

2024-10-03 09:15:48 发布

您现在位置:Python中文网/ 问答频道 /正文

我对python中的avro和kafka很陌生。我在Java和Maven中经常使用它

使用Java和Maven,可以将avro模式文件(.avsc)添加到特定文件夹,然后在pom文件上配置特定的Maven插件。当您构建或编译插件时,所有的工作都会将scema发布到模式注册表,并将所有avro模式编译成SpecificRecord

对于python,它有点不同,因为没有从avro模式到python类的编译

为了将模式加载到模式注册表并为我的avro消息使用特定类,我做了如下操作:

  • 创建一个文件夹,我已将所有avro模式放置在其中avro_events

  • avro_event文件夹中,我为avro_event文件夹中定义的每个模式添加了另一个包含python文件的文件夹。每个.py文件将模式从kafkian库加载到AvroRecord中:

      from confluent_kafka import avro
      from kafkian.serde.avroserdebase import AvroRecord
    
      schema_path = "./avro_events/SchemaTest.avsc"
    
      class SchemaTest(AvroRecord):
         f = open(schema_path, "rb")
         _schema = avro.loads(f.read())
    
  • 当我的应用程序启动时,我使用python的confluent schema registry api注册avro_events文件夹中的所有模式:

        def register_schemas():
          avro_folder = `avro_events`
          file_names = os.listdir(avro_folder)
          for file_name in file_names:
              path = avro_folder + "/" + file_name
              if not os.path.isdir(path):
                  f = open(path, "rb")
                  schema_def = f.read()
                  avro_schema = schema.AvroSchema(json.loads(schema_def))
                  subject_name = file_name.replace(".avsc","")
                  schema_id = self.__client.register(subject_name, avro_schema)
                  self.__logger.info(str.format("Registered {} schema with id {}", subject_name, schema_id))
                  f.close()
    

我不太喜欢我的解决方案,但我找不到任何关于如何处理这种情况的例子。有更好的方法注册avro模式(从.avsc文件)并生成AvroRecord用于事件发布吗


Tags: 文件pathname文件夹schemadef模式folder