使用GenericRecords时,java Flink Avro序列化显示“不可序列化”错误
我真的很难让Flink使用Confluent schema Registry中的Avro模式(对于键和值)与正在运行的Kafka实例正确通信
经过一段时间的思考和重组我的计划,我能够推动我的实施到目前为止:
生产者方法
public static FlinkKafkaProducer<Tuple2<GenericRecord,GenericRecord>> kafkaAvroGenericProducer() {
final Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "--.-.-.--:9092");
properties.put("schema.registry.url", "http://--.-.-.---:8081");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KVSerializationSchema.class.getName()); //wrong class should not matter
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KVSerializationSchema.class.getName()); //wrong class but should not matter
return new FlinkKafkaProducer<Tuple2<GenericRecord,GenericRecord>>("flink_output",
new GenericSerializer("flink_output", schemaK, schemaV, "http://--.-.-.---:8081"),
properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
通用序列化程序。java
package com.reeeliance.flink;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import flinkfix.ConfluentRegistryAvroSerializationSchema;
public class GenericSerializer implements KafkaSerializationSchema<Tuple2<GenericRecord,GenericRecord>>{
private String topic;
private Schema schemaKey;
private Schema schemaValue;
private String registryUrl;
public GenericSerializer(String topic, Schema schemaK, Schema schemaV, String url) {
super();
this.topic = topic;
this.schemaKey = schemaK;
this.schemaValue = schemaV;
this.registryUrl = url;
}
public GenericSerializer() {
super();
}
@Override
public ProducerRecord<byte[], byte[]> serialize(Tuple2<GenericRecord,GenericRecord> element, Long timestamp) {
byte[] key = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-key", schemaKey, registryUrl).serialize(element.f0);
byte[] value = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-value", schemaValue, registryUrl).serialize(element.f1);
return new ProducerRecord<byte[], byte[]>(topic, key, value);
}
}
但是,当我执行作业时,它在准备阶段失败,而作业实际运行时没有出现以下错误:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: [H_EQUNR type:STRING pos:0] is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:617)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:571)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:547)
at com.reeeliance.flink.StreamingJob.kafkaAvroGenericProducer(StreamingJob.java:257)
at com.reeeliance.flink.StreamingJob.main(StreamingJob.java:84)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
- custom writeObject data (class "java.util.ArrayList")
- root object (class "org.apache.avro.Schema$LockableArrayList", [H_EQUNR type:STRING pos:0])
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.ArrayList.writeObject(ArrayList.java:766)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
... 8 more
我知道所有类都必须实现可串行化-接口,或者使其成为临时类,但我不使用自己的类,并且错误不会处理一个函数,该函数不可串行化(通常线程处理),而是一个记录或字段。 该字段来自密钥模式,该模式只包含这一个字段。我认为我的错误在于使用GenericRecord,它没有实现可序列化的接口,但我看到GenericRecord经常被用于这种序列化,所以对我来说没有意义
类ConfluentRegistryAvroSerializationSchema取自GitHub,因为它尚未包含在我们正在使用的当前Flink版本(1.9.1)中。我包括了必要的课程和改变的课程,我不认为这可能是我问题的原因。(Issue solved)
谁能帮我调试一下吗?我也非常感激,如果你能给我展示一种不同的方式来实现同样的目标,Flink Avro和Confluent Schema Registry的不兼容性到目前为止一直让我抓狂
# 1 楼答案
在部署flink作业时,我遇到了相同的错误(由:java.io.NotSerializableException:org.apache.avro.Schema$Field引起),下面是我的序列化程序&;值得
值得:
以下是错误堆栈:
我不想使用Confluent schema registry,有什么线索吗
# 2 楼答案
问题在于组织。阿帕奇。阿夫罗。Schema$Field类。该类不可序列化,这将导致此异常。flink文档中的注释部分也提到了该解决方案
Since Avro’s Schema class is not serializable, it can not be sent around as is. You can work around this by converting it to a String and parsing it back when needed. If you only do this once on initialization, there is practically no difference to sending it directly.
所以我们需要对收到的每一条消息进行解析,我们不能在构造函数中只解析一次,这与传递到构造函数本身是一样的
因此,解决方案可以类似于下面的代码片段。我们将接受avro模式作为构造函数中的字符串,并将在serialize方法中创建avro模式
还有一件事,我们需要记住的是,提供flink序列化avro所需的类型信息,否则它将退回kyro进行序列化
avro serialization with flink