有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

使用GenericRecords时,java Flink Avro序列化显示“不可序列化”错误

我真的很难让Flink使用Confluent schema Registry中的Avro模式(对于键和值)与正在运行的Kafka实例正确通信

经过一段时间的思考和重组我的计划,我能够推动我的实施到目前为止:

生产者方法

    public static FlinkKafkaProducer<Tuple2<GenericRecord,GenericRecord>> kafkaAvroGenericProducer() {  
        final Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "--.-.-.--:9092");
        properties.put("schema.registry.url", "http://--.-.-.---:8081");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KVSerializationSchema.class.getName()); //wrong class should not matter
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KVSerializationSchema.class.getName()); //wrong class but should not matter


        return new FlinkKafkaProducer<Tuple2<GenericRecord,GenericRecord>>("flink_output",
                new GenericSerializer("flink_output", schemaK, schemaV, "http://--.-.-.---:8081"),
                properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

    }

通用序列化程序。java

package com.reeeliance.flink;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import flinkfix.ConfluentRegistryAvroSerializationSchema;

public class GenericSerializer implements KafkaSerializationSchema<Tuple2<GenericRecord,GenericRecord>>{

    private String topic;   
    private Schema schemaKey;
    private Schema schemaValue;
    private String registryUrl;

    public GenericSerializer(String topic, Schema schemaK, Schema schemaV, String url) {
        super();
        this.topic = topic;
        this.schemaKey = schemaK;
        this.schemaValue = schemaV;
        this.registryUrl = url;
    }

    public GenericSerializer() {
        super();
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(Tuple2<GenericRecord,GenericRecord> element, Long timestamp) {
        byte[] key = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-key", schemaKey, registryUrl).serialize(element.f0);
        byte[] value = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-value", schemaValue, registryUrl).serialize(element.f1);

        return new ProducerRecord<byte[], byte[]>(topic, key, value);
    }

}

但是,当我执行作业时,它在准备阶段失败,而作业实际运行时没有出现以下错误:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: [H_EQUNR type:STRING pos:0] is not serializable. The object probably contains or references non serializable fields.
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:617)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:571)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:547)
    at com.reeeliance.flink.StreamingJob.kafkaAvroGenericProducer(StreamingJob.java:257)
    at com.reeeliance.flink.StreamingJob.main(StreamingJob.java:84)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
    - custom writeObject data (class "java.util.ArrayList")
    - root object (class "org.apache.avro.Schema$LockableArrayList", [H_EQUNR type:STRING pos:0])
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at java.util.ArrayList.writeObject(ArrayList.java:766)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
    ... 8 more

我知道所有类都必须实现可串行化-接口,或者使其成为临时类,但我不使用自己的类,并且错误不会处理一个函数,该函数不可串行化(通常线程处理),而是一个记录或字段。 该字段来自密钥模式,该模式只包含这一个字段。我认为我的错误在于使用GenericRecord,它没有实现可序列化的接口,但我看到GenericRecord经常被用于这种序列化,所以对我来说没有意义

ConfluentRegistryAvroSerializationSchema取自GitHub,因为它尚未包含在我们正在使用的当前Flink版本(1.9.1)中。我包括了必要的课程和改变的课程,我不认为这可能是我问题的原因。(Issue solved

谁能帮我调试一下吗?我也非常感激,如果你能给我展示一种不同的方式来实现同样的目标,Flink Avro和Confluent Schema Registry的不兼容性到目前为止一直让我抓狂


共 (2) 个答案

  1. # 1 楼答案

    在部署flink作业时,我遇到了相同的错误(由:java.io.NotSerializableException:org.apache.avro.Schema$Field引起),下面是我的序列化程序&;值得

        public static class AVROGeneratorSchema implements SerializationSchema<GenericRecord> {
    
        @Override
        public byte[] serialize(GenericRecord genericData) {
            StringBuffer sb = new StringBuffer();
            sb.append((String) genericData.get("field1"));
            sb.append("::");
            sb.append((String) genericData.get("field2"));
            sb.append("::");
            sb.append((String) genericData.get("field3"));
            return (sb.toString()).getBytes();
        }
    }
    

    值得:

        public static class AVROGeneratorDeSchema implements DeserializationSchema<GenericRecord> {
        @Override
        public GenericRecord deserialize(byte[] bytes) throws IOException {
            GenericRecord responseData = new GenericData.Record(new Schema.Parser().parse(schemaStr));
            String[] tokens = new String(bytes).split("::");
            responseData.put("field1", tokens[0]);
            responseData.put("field2", tokens[1]);
            responseData.put("field3", tokens[2]);
            return responseData;
        }
    
        @Override
        public TypeInformation<GenericRecord> getProducedType() {
            return TypeExtractor.getForClass(GenericRecord.class);
        }
        @Override
        public boolean isEndOfStream(GenericRecord genericData) {
            return false;
        }
    }
    

    以下是错误堆栈:

    Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: [field1 type:STRING pos:0] is not serializable. The object probably contains or references non serializable fields.
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:617)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:571)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:547)
        at com.reeeliance.flink.StreamingJob.kafkaAvroGenericProducer(StreamingJob.java:257)
        at com.reeeliance.flink.StreamingJob.main(StreamingJob.java:84)
    Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
        - custom writeObject data (class "java.util.ArrayList")
        - root object (class "org.apache.avro.Schema$LockableArrayList", [field1 type:STRING pos:0])
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at java.util.ArrayList.writeObject(ArrayList.java:766)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
        at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
    

    我不想使用Confluent schema registry,有什么线索吗

  2. # 2 楼答案

    问题在于组织。阿帕奇。阿夫罗。Schema$Field类。该类不可序列化,这将导致此异常。flink文档中的注释部分也提到了该解决方案

    Since Avro’s Schema class is not serializable, it can not be sent around as is. You can work around this by converting it to a String and parsing it back when needed. If you only do this once on initialization, there is practically no difference to sending it directly.

    所以我们需要对收到的每一条消息进行解析,我们不能在构造函数中只解析一次,这与传递到构造函数本身是一样的

    因此,解决方案可以类似于下面的代码片段。我们将接受avro模式作为构造函数中的字符串,并将在serialize方法中创建avro模式

    class AvroMessageSerializationSchema(topic: String, schemaString: String, schemaRegistry: String) extends KafkaSerializationSchema[GenericRecord] {
    
      private def getSchema(schema: String): Schema = {
        new Schema.Parser().parse(schema)
      }
    
      override def serialize(element: GenericRecord, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
        val schema = getSchema(schemaString)
        val value = ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, schemaRegistry).serialize(element)
        new ProducerRecord[Array[Byte], Array[Byte]](topic, value)
      }
    }

    还有一件事,我们需要记住的是,提供flink序列化avro所需的类型信息,否则它将退回kyro进行序列化

    implicit val typeInformation: TypeInformation[GenericRecord] = new GenericRecordAvroTypeInfo(avroSchema)
    

    avro serialization with flink