有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何使用Kafka Publisher将行数据发布为Avro记录

我的目标是使用Avro模式将数据发布到使用Flink的Kafka主题。我的代码如下所示(请注意,这只是转换的一部分)。首先,我使用MapFunction计算不同流中的新属性。然后(见下文),我将流转换为表,以便通过SQL查询连接它们。之后,我将输出表转换回一个流。最后,我想将此流发布到卡夫卡主题

StreamExecutionEnvironment DataStreamEnvironment= StreamExecutionEnvironment.getExecutionEnvironment()
...
// Consuming streams from Kafka
// transform streams with MapFunctions
...

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(DataStreamEnvironment);
            
// converting to tables
tableEnv.createTemporaryView("tableA", streamA);
tableEnv.createTemporaryView("tableB", streamB);
tableEnv.createTemporaryView("tableC", streamC);
tableEnv.createTemporaryView("tableD", streamD);

//joining the registered tables
Table output_table = tableEnv.sqlQuery(
       "SELECT * " +
       "FROM tableC" +
       "FULL OUTER JOIN tableD ON tableC._KEY_wDots_ = tableD.Key " +
       "FULL OUTER JOIN tableA ON tableA._KEY_TNr_KSt_wDots_ = tableC._KEY_TNr_KSt_wDots_ " +
       "FULL OUTER JOIN tableBON tableB.Teilenummer = tableA ._KEY_TNr_woDots_"
       );

// Convert back to DataStream
DataStream<Row> output_stream = tableEnv.toChangelogStream(output_table);

// output
FlinkKafkaProducer010 kafkaProducer = new FlinkKafkaProducer010(
                outputTopic,
                new AvroRowSerializationSchema(AvroOutputSchemaClass.getClassSchema().toString()),
                properties);
output_stream.addSink(kafkaProducer);

environment.execute("Merge Streams");

运行作业时,会出现以下错误消息:

com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:354)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:191)
    at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:53)
    at org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.readInto(NonSpanningWrapper.java:337)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNonSpanningRecord(SpillingAdaptiveSpanningRecordDeserializer.java:128)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.readNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:103)
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:93)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:95)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    at java.base/java.lang.Thread.run(Unknown Source)

我知道,当Flink没有足够的类型信息并返回Kyro进行序列化时,会抛出此错误消息。我无法准确地找到问题的根源,但我非常确定它发生在FlinkKafkaPublisher。我看到的所有解决方案在添加源代码时都会处理这个问题,使用类似returns(TypeInformation)here。我认为这在我的例子中是不可能的,因为我从表中创建了流

我有两个问题

  1. 我甚至可以使用AvroRowSerializationSchema()将行数据发布为Avro数据吗
  2. 我如何告诉Flink使用我的Avro模式进行序列化?或者,我如何提供足够的信息,使Flink能够成功地序列化数据

共 (0) 个答案