有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java beam使用模式注册表动态解码avro记录

我一直在尝试编写一个从卡夫卡主题中读取的beam管道,该主题由avro记录组成。这些记录的模式可以快速更改,因此我想在提取相关公共字段之前,使用Confluent模式注册表获取模式并解码事件。 要么我做错了什么,要么文档已经过时。 我在这里遵循了这个例子:https://github.com/apache/beam/blob/dfa1e475194ac6f65c42da7b8cb8d5055dd1952c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L176-L198

 * <p>If you want to deserialize the keys and/or values based on a schema available in Confluent
 * Schema Registry, KafkaIO can fetch this schema from a specified Schema Registry URL and use it
 * for deserialization. A {@link Coder} will be inferred automatically based on the respective
 * {@link Deserializer}.
 *
 * <p>For an Avro schema it will return a {@link PCollection} of {@link KafkaRecord}s where key
 * and/or value will be typed as {@link org.apache.avro.generic.GenericRecord}. In this case, users
 * don't need to specify key or/and value deserializers and coders since they will be set to {@link
 * KafkaAvroDeserializer} and {@link AvroCoder} by default accordingly.
 *
 * <p>For example, below topic values are serialized with Avro schema stored in Schema Registry,
 * keys are typed as {@link Long}:
 *
 * <pre>{@code
 * PCollection<KafkaRecord<Long, GenericRecord>> input = pipeline
 *   .apply(KafkaIO.<Long, GenericRecord>read()
 *      .withBootstrapServers("broker_1:9092,broker_2:9092")
 *      .withTopic("my_topic")
 *      .withKeyDeserializer(LongDeserializer.class)
 *      // Use Confluent Schema Registry, specify schema registry URL and value subject
 *      .withValueDeserializer(
 *          ConfluentSchemaRegistryDeserializerProvider.of("http://localhost:8081", "my_topic-value"))
 *    ...

我的代码如下所示:

    p.apply("ReadFromKafka",
        KafkaIO.<Long, GenericRecord>read()
        .withBootstrapServers(options.getBootstrapServers())
        .withTopic(options.getInputTopic())
        .withKeyDeserializer(LongDeserializer.class)
        .withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider.of(
                "http://public-kafka-registry.mydomain.com:8081",
                "my_topic-value"))
        .withNumSplits(1)
        .withoutMetadata()
    );

但是,我得到了以下错误:

incompatible types: no instance(s) of type variable(s) T exist so that org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider<T> conforms to java.lang.Class<? extends org.apache.kafka.common.serialization.Deserializer<org.apache.avro.generic.GenericRecord>>

screenshot of error message

感谢您的帮助,因为我不是java高手


共 (1) 个答案

  1. # 1 楼答案

    试试这个,它很有效:

    KafkaIO.<String, GenericRecord>read()
    .withBootstrapServers(options.getBootstrap()) 
    .withTopic(options.getInputTopic()) 
    .withConsumerConfigUpdates(ImmutableMap.of("specific.avro.reader", "true"))
    .withKeyDeserializer(StringDeserializer.class) 
    .withValueDeserializer(ConfluentSchemaRegistryDeserializerProvider 
    .of(options.getSchemaRegistryURL(), "My_TOPIC-valie")));