有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!


共 (1) 个答案

  1. # 1 楼答案

    使用下面的代码创建一个消息驱动适配器,以便连接以使用来自kafka的消息

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.topic}")
    private String springIntegrationKafkaTopic;
    @Bean
    public IntegrationFlow kafkaReader() throws Exception {
    
        return IntegrationFlows
                .from(Kafka.messageDrivenChannelAdapter(listener(),ListenerMode.record))
                .channel("queureader")
                .get();
    }
    
    @Bean
    public KafkaMessageListenerContainer listener() {
        return new KafkaMessageListenerContainer(consumerFactory(), new ContainerProperties(this.springIntegrationKafkaTopic));
    }
    
    @Bean
    public ConsumerFactory consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafkaListener");
        return new DefaultKafkaConsumerFactory(props);
    }
    
    
    @ServiceActivator(inputChannel = "queureader")
    public void Print(Message<?> msg)  {
    
        System.out.println(msg.getPayload().toString());
    }
    

    正在申请中。属性

    • 春天。卡夫卡。bootstrap servers=我们需要提到服务器名称 -春天。卡夫卡。topic=主题的名称

    你可以提到ConsumerConfig的任何价值。组ID配置