java beam使用模式注册表动态解码avro记录
我一直在尝试编写一个从卡夫卡主题中读取的beam管道,该主题由avro记录组成。这些记录的模式可以快速更改,因此我想在提取相关公共字段之前,使用Confluent模式注册表获取模式并解码事件。 要么我做错了什么,要么文档已经过时。 我在这里遵循了这个例子:https://github.com/apache/beam/blob/dfa1e475194ac6f65c42da7b8cb8d5055dd1952c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L176-L198
* <p>If you want to deserialize the keys and/or values based on a schema available in Confluent
* Schema Registry, KafkaIO can fetch this schema from a specified Schema Registry URL and use it
* for deserialization. A {@link Coder} will be inferred automatically based on the respective
* {@link Deserializer}.
*
* <p>For an Avro schema it will return a {@link PCollection} of {@link KafkaRecord}s where key
* and/or value will be typed as {@link org.apache.avro.generic.GenericRecord}. In this case, users
* don't need to specify key or/and value deserializers and coders since they will be set to {@link
* KafkaAvroDeserializer} and {@link AvroCoder} by default accordingly.
*
* <p>For example, below topic values are serialized with Avro schema stored in Schema Registry,
* keys are typed as {@link Long}:
*
* <pre>{@code
* PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
* .apply(KafkaIO.<Long, GenericRecord>read()
* .withBootstrapServers("broker_1:9092,broker_2:9092")
* .withTopic("my_topic")
* .withKeyDeserializer(LongDeserializer.class)
* // Use Confluent Schema Registry, specify schema registry URL and value subject
* .withValueDeserializer(
* ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", "my_topic-value"))
* ...
我的代码如下所示:
p.apply("ReadFromKafka",
KafkaIO.<Long, GenericRecord>read()
.withBootstrapServers(options.getBootstrapServers())
.withTopic(options.getInputTopic())
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of(
"http://public-kafka-registry.mydomain.com:8081",
"my_topic-value"))
.withNumSplits(1)
.withoutMetadata()
);
但是,我得到了以下错误:
incompatible types: no instance(s) of type variable(s) T exist so that org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider<T> conforms to java.lang.Class<? extends org.apache.kafka.common.serialization.Deserializer<org.apache.avro.generic.GenericRecord>>
感谢您的帮助,因为我不是java高手
# 1 楼答案
试试这个,它很有效: