使用Spring integration java配置创建消息驱动的入站通道适配器以连接到Kafka 1 周,4 日 Questions & Answers 418 使用spring集成Java dsl创建kafka消费者的Java代码
# 1 楼答案 使用下面的代码创建一个消息驱动适配器,以便连接以使用来自kafka的消息 @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.topic}") private String springIntegrationKafkaTopic; @Bean public IntegrationFlow kafkaReader() throws Exception { return IntegrationFlows .from(Kafka.messageDrivenChannelAdapter(listener(),ListenerMode.record)) .channel("queureader") .get(); } @Bean public KafkaMessageListenerContainer listener() { return new KafkaMessageListenerContainer(consumerFactory(), new ContainerProperties(this.springIntegrationKafkaTopic)); } @Bean public ConsumerFactory consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafkaListener"); return new DefaultKafkaConsumerFactory(props); } @ServiceActivator(inputChannel = "queureader") public void Print(Message<?> msg) { System.out.println(msg.getPayload().toString()); } 正在申请中。属性 春天。卡夫卡。bootstrap servers=我们需要提到服务器名称 -春天。卡夫卡。topic=主题的名称 你可以提到ConsumerConfig的任何价值。组ID配置
# 1 楼答案
使用下面的代码创建一个消息驱动适配器,以便连接以使用来自kafka的消息
正在申请中。属性
你可以提到ConsumerConfig的任何价值。组ID配置