java使用单个json,但发布多个AVRO消息
我不熟悉春天的卡夫卡河。我最近设置了一个项目,并尝试使用kafka stream API发布AVRO。 要求: 需要解析可能包含多个子JSON的复杂Jsonobject。解析json并为每个子json发送AVRO消息。它也可以是一个或多个
我知道如何使用kafka客户端API实现这一点。这很简单,可以用循环完成,但用流API,我是新手
流类的当前代码如下所示:
@Service
@Log4j2
@EnableBinding(StreamBindings.class)
public class StreamListener {
@Autowired
Service service;
@StreamListener("Processor-input-channel")
@SendTo("Processor-output-channel")
public KStream<String, AVROClass> process(KStream<String, String> input){
//parse input , map to fault and change received key to time stamp and send
KStream<String, AVROClass> kStream = input
.mapValues(v -> service.getAVROResponse(v))
.map((k,v)->KeyValue.pair((Long.toString((System.currentTimeMillis()))), v));
kStream.foreach((k, v) -> log.info(String.format("Key: %s, Value: %s", k, v)));
return kStream;
}
我一次生成一个AVRO,但我需要知道如何生成多个AVRO,并在卡夫卡中将其作为单独的消息发送到输出主题。因为我从getAVROResponse返回的任何内容都将通过流自动发送
# 1 楼答案
要使用JSON,需要使用String/JSON-Serde,而不是
AvroClass
(假设这实际上是一个AvroSpecificRecord
子类)否则,您似乎缺少Avro Serde,
Consumed.with(<<avroSerde>>))
,或者在StreamsConfig中将其设置为默认Serde如果要根据某些条件(嵌套类型)将传入消息分离到几个不同的主题,请使用
branch
如果要将一条消息转换为多条消息,然后转换为某个输出主题,请使用
flatMapValues().to()
要生成Avro,请确保在
.to()
输出中使用Produced.with()
# 2 楼答案
您应该能够使用Kafka Streams和Spring Cloud Stream完成这个用例,如下所示。请注意,不再建议使用
StreamListener
,建议使用函数方法。见下文这只是一个psuedo代码,请根据需要进行更新
在配置中,您需要根据您的设置执行此操作。请适当更新架构注册表URL
如果您不想像上面那样将Avro-serde指定为Springbean,那么可以按照下面的方式进行配置
然而,我认为将avroserde指定为springbean可能是一种更干净的方法,因为springcloudstreambinder可以将函数签名的类型与serdebean的类型进行匹配。例如,在这种情况下,绑定器将对函数进行内省,并将其输出签名
KStream<String, AvroClass>
与Serde bean上定义的内容Serde<AvroClass>
相匹配,并将其指定为出站上的值Serde