JavaSpringBoot2.1。x与卡夫卡1.1的集成。十、
我们正在从1.5迁移现有的spring boot应用程序。x到2.1。x与卡夫卡的整合
当我们使用Kafka 1.0.0代理时,我们需要将Kafka客户端版本降级为1.1.1。因此,我们降级了客户端(已评论的pom内容),但降级后它无法使用消息。它可以与卡夫卡客户端2.0.1库配合使用
下面是pom文件。请帮忙
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>demo</artifactId>
<description>Demo project for Spring Boot</description>
<groupId>com.example</groupId>
<modelVersion>4.0.0</modelVersion>
<name>demo</name>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<relativePath/>
<version>2.1.15.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<!--<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>-->
</dependency>
<!--<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.1</version>
</dependency>-->
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<scope>import</scope>
<type>pom</type>
<version>Greenwich.SR1</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
消费者代码:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@KafkaListener(topics = "test.topic", groupId = "test.group")
public void consume(Message message) {
System.out.println(message);
}
}
yml文件详细信息
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: test.group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: localhost:9092
key-deserializer: org.apache.kafka.common.serialization.StringSerializer
value-deserializer: org.apache.kafka.common.serialization.StringSerializer
logging:
level:
root: INFO
提交的文件
https://github.com/spring-cloud/spring-cloud-stream/wiki/Kafka-Client-Compatibility
https://spring.io/projects/spring-kafka
# 1 楼答案
没有必要降级;现在,您可以将较新的kafka客户端与较旧的代理一起使用很长一段时间