有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Spring Boot Kafka消费者自定义JsonDeserializer,类型信息在yaml/properties中,不带@Bean

是否可以将类型信息传递给Kafka使用者的反序列化程序,而不使用consumerFactory()containerFactory@Bean定义@Configuration

@Bean方法中,我必须将yml文件中已有的所有配置再次放入映射,并将其传递给工厂的构造函数,但我认为这是一种开销。我想找到一种方法,将所有配置保存在yaml/properties中。(我认为将配置放在yaml中比放在代码中更干净)

我想为每个消费者/每个方法指定类型,并用@KafkaListener注释,因为我将收到不同的JSON,映射到其相应的DTO,所以通用的spring.json.value.default.type在这里不适用。(我觉得很难看)

我正在使用我的代码进行测试(使用EmbeddedKafka的Spring Kafka测试),但现在它抱怨找不到类型信息:

o.s.k.listener.LoggingErrorHandler - Error while processing: null org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition checkout_order_paid-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
    at org.springframework.util.Assert.state(Assert.java:73)
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:326)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1041)
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3300(Fetcher.java:110)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1223)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1072)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:562)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:523)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1230)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:719)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:676)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.lang.Thread.run(Thread.java:748)

我知道这些文档,但不知道如何正确操作:

https://docs.spring.io/spring-kafka/reference/html/#spring-messaging-message-conversion


共 (0) 个答案