Java通过输入SpecificAvroSerde动态更改Kafka数据类型
import KafkaDataType;
...
...
final Serde<KafkaDataType> eventSchema = new SpecificAvroSerde<>();
...
...
StreamsBuilder builder = new StreamsBuilder();
KStream<String, KafkaDataType> eventStream = builder.stream(STREAM_TOPIC);
我的KafkaDataType
是一个自动从相关的.avsc
文件生成的avro模式。我的理解是KafkaDataType
必须是预定义的。然而,是否存在允许动态或泛型KafkaDataType
的现有方法?如果是这样的话,请提供一个示例代码块
我们的目标是使KafkaDataType
成为一种通用的数据类型,这样具有不同avro模式的不同卡夫卡流就可以进行交换,并由Java代码进行处理。目前,对于每个不同的avro模式,我需要将KafkaDataType
从.avsc
模式更改为特定的Java自动生成类
如果需要进一步澄清,请告诉我
# 1 楼答案
对于使用特定记录/序列,是
Avro SpecificRecord类(例如生成的类)从
GenericRecord
扩展而来,您可以将其与GenericSerde一起使用,而不是与特定类型一起使用,以处理同一流中的多种类型的记录