java使用循环分区分配策略提供CommitFailedException
我用的是弹簧靴和弹簧卡夫卡 这是我的消费者配置
public ConsumerFactory<String, String> getConsumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
return new DefaultKafkaConsumerFactory<>(
props, new StringDeserializer(), new StringDeserializer());
}
我正在使用手动提交的循环分区分配策略。但每当我打电话
acknowledgement.acknowledge()
我得到了这个例外
org.apache.kafka.clients.consumer.CommitFailedException
我该怎么解决这个问题
共 (0) 个答案