java如何使用Kafka Publisher将行数据发布为Avro记录
我的目标是使用Avro模式将数据发布到使用Flink的Kafka主题。我的代码如下所示(请注意,这只是转换的一部分)。首先,我使用MapFunction计算不同流中的新属性。然后(见下文),我将流转换为表,以便通过SQL查询连接它们。之后,我将输出表转换回一个流。最后,我想将此流发布到卡夫卡主题
StreamExecutionEnvironment DataStreamEnvironment= StreamExecutionEnvironment.getExecutionEnvironment()
...
// Consuming streams from Kafka
// transform streams with MapFunctions
...
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(DataStreamEnvironment);
// converting to tables
tableEnv.createTemporaryView("tableA", streamA);
tableEnv.createTemporaryView("tableB", streamB);
tableEnv.createTemporaryView("tableC", streamC);
tableEnv.createTemporaryView("tableD", streamD);
//joining the registered tables
Table output_table = tableEnv.sqlQuery(
"SELECT * " +
"FROM tableC" +
"FULL OUTER JOIN tableD ON tableC._KEY_wDots_ = tableD.Key " +
"FULL OUTER JOIN tableA ON tableA._KEY_TNr_KSt_wDots_ = tableC._KEY_TNr_KSt_wDots_ " +
"FULL OUTER JOIN tableBON tableB.Teilenummer = tableA ._KEY_TNr_woDots_"
);
// Convert back to DataStream
DataStream<Row> output_stream = tableEnv.toChangelogStream(output_table);
// output
FlinkKafkaProducer010 kafkaProducer = new FlinkKafkaProducer010(
outputTopic,
new AvroRowSerializationSchema(AvroOutputSchemaClass.getClassSchema().toString()),
properties);
output_stream.addSink(kafkaProducer);
environment.execute("Merge Streams");
运行作业时,会出现以下错误消息:
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source)
我知道,当Flink没有足够的类型信息并返回Kyro进行序列化时,会抛出此错误消息。我无法准确地找到问题的根源,但我非常确定它发生在FlinkKafkaPublisher
。我看到的所有解决方案在添加源代码时都会处理这个问题,使用类似returns(TypeInformation)
的here。我认为这在我的例子中是不可能的,因为我从表中创建了流
我有两个问题
- 我甚至可以使用
AvroRowSerializationSchema()
将行数据发布为Avro数据吗李> - 我如何告诉Flink使用我的Avro模式进行序列化?或者,我如何提供足够的信息,使Flink能够成功地序列化数据李>
共 (0) 个答案