我们的项目有scala和python代码,我们需要向kafka发送/使用avro编码的消息。在
我用python和scala向kafka发送avro编码的消息。我有scala代码的producer,它使用Twitter双射库发送avro编码的消息,如下所示:
val resourcesPath = getClass.getResource("/avro/url_info_schema.avsc")
val schemaFile = scala.io.Source.fromURL(resourcesPath).mkString
val schema = parser.parse(schemaFile)
val recordInjection = GenericAvroCodecs[GenericRecord](schema)
val avroRecord = new GenericData.Record(schema)
avroRecord.put("url_sha256", row._1)
avroRecord.put("url", row._2._1)
avroRecord.put("timestamp", row._2._2)
val recordBytes = recordInjection.apply(avroRecord)
kafkaProducer.value.send("topic", recordBytes)
Avro模式看起来像
^{pr2}$}
我能够在scala的KafkaConsumer中成功地解码它
val resourcesPath = getClass.getResource("/avro/url_info_schema.avsc")
val schemaFile = scala.io.Source.fromURL(resourcesPath).mkString
kafkaInputStream.foreachRDD(kafkaRDD => {
kafkaRDD.foreach(
avroRecord => {
val parser = new Schema.Parser()
val schema = parser.parse(schemaFile)
val recordInjection = GenericAvroCodecs[GenericRecord](schema)
val record = recordInjection.invert(avroRecord.value()).get
println(record)
}
)
}
但是,我无法解码python中的消息,我得到以下异常
'utf8' codec can't decode byte 0xe4 in position 16: invalid continuation byte
python代码如下所示: schema_path=“avro/url_信息_架构.avsc" 架构=avro.schema.parse(打开(架构路径).read())
for msg in consumer:
bytes_reader = io.BytesIO(msg.value)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
decoded_msg = reader.read(decoder)
print(decoded_msg)
此外,scala avro消费者不理解python avro生产者消息。我有个例外。Python Avro producer如下所示:
datum_writer = DatumWriter(schema)
bytes_writer = io.BytesIO()
datum_writer = avro.io.DatumWriter(schema)
encoder = avro.io.BinaryEncoder(bytes_writer)
datum_writer.write(data, encoder)
raw_bytes = bytes_writer.getvalue()
producer.send(topic, raw_bytes)
如何在python和scala之间保持一致?任何建议都很好
我在python中使用二进制编码器,而在Scala中什么也没有。只需要从
到
^{pr2}$我希望其他人觉得有用。python代码无需更改
相关问题 更多 >
编程相关推荐