有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java如何使Kafka使用者从特定主题分区读取Spring Boot

我对卡夫卡和Spring Boot有点陌生,我试图让我的应用程序从主题的特定分区读取

 @KafkaListener(id = "singleLnr", groupId = "${kafka.consumer.group.id}",containerFactory = "singleFactory", topicPartitions = @TopicPartition(topic = "${kafka.topic.singleAttendance}", partitions = {"0"}))
public void consume2(ConsumerRecord attendanceInfo) {
    System.out.println(attendanceInfo);
}

单一工厂代码

@Bean(name = "singleFactory")
public KafkaListenerContainerFactory singleFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Map<String, String>> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(false);
    factory.setMessageConverter(converter());
    return factory;
}

这也是我的消费者工厂配置

 @Bean(name = "consumerFactory")
public ConsumerFactory<String, Map<String, String>> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapAddress);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerGroupId);
    return new DefaultKafkaConsumerFactory<>(props);
}

当我试图运行程序时,它会给我一个错误

Offset commit failed on partition single.attendance-0 at offset 308: The coordinator is not aware of this member.

和警告

failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

我如何让我的消费者从特定分区读取数据?你能至少给我一个提示吗


共 (1) 个答案

  1. # 1 楼答案

    卡夫卡自己为每个分区分配使用者。无需在此实现中在@KafkaListener中进行配置

    @KafkaListener(id = "singleLnr", groupId = "${kafka.consumer.group.id}",containerFactory = "singleFactory", topics = "${kafka.topic.singleAttendance}")
        public void consume2(ConsumerRecord attendanceInfo) {
            System.out.println(attendanceInfo);
        }