我对python中的avro和kafka很陌生。我在Java和Maven中经常使用它
使用Java和Maven,可以将avro模式文件(.avsc)添加到特定文件夹,然后在pom文件上配置特定的Maven插件。当您构建或编译插件时,所有的工作都会将scema发布到模式注册表,并将所有avro模式编译成SpecificRecord
对于python,它有点不同,因为没有从avro模式到python类的编译
为了将模式加载到模式注册表并为我的avro消息使用特定类,我做了如下操作:
创建一个文件夹,我已将所有avro模式放置在其中avro_events
在avro_event
文件夹中,我为avro_event
文件夹中定义的每个模式添加了另一个包含python
文件的文件夹。每个.py
文件将模式从kafkian
库加载到AvroRecord
中:
from confluent_kafka import avro
from kafkian.serde.avroserdebase import AvroRecord
schema_path = "./avro_events/SchemaTest.avsc"
class SchemaTest(AvroRecord):
f = open(schema_path, "rb")
_schema = avro.loads(f.read())
当我的应用程序启动时,我使用python的confluent schema registry api注册avro_events
文件夹中的所有模式:
def register_schemas():
avro_folder = `avro_events`
file_names = os.listdir(avro_folder)
for file_name in file_names:
path = avro_folder + "/" + file_name
if not os.path.isdir(path):
f = open(path, "rb")
schema_def = f.read()
avro_schema = schema.AvroSchema(json.loads(schema_def))
subject_name = file_name.replace(".avsc","")
schema_id = self.__client.register(subject_name, avro_schema)
self.__logger.info(str.format("Registered {} schema with id {}", subject_name, schema_id))
f.close()
我不太喜欢我的解决方案,但我找不到任何关于如何处理这种情况的例子。有更好的方法注册avro模式(从.avsc文件)并生成AvroRecord
用于事件发布吗
目前没有回答
相关问题 更多 >
编程相关推荐