有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java使用循环分区分配策略提供CommitFailedException

我用的是弹簧靴和弹簧卡夫卡 这是我的消费者配置

public ConsumerFactory<String, String> getConsumerProps() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
    return new DefaultKafkaConsumerFactory<>(
        props, new StringDeserializer(), new StringDeserializer());
  }

我正在使用手动提交的循环分区分配策略。但每当我打电话

acknowledgement.acknowledge()

我得到了这个例外

org.apache.kafka.clients.consumer.CommitFailedException

我该怎么解决这个问题


共 (0) 个答案