java这是使用kafka consumer的pause()和resume()的正确方法吗?
我正在尝试使用Kafka Consumer Interface中的pause()和resume()API在Kafka消费流中实现背压。我的想法是暂停()消费,直到处理当前轮询的消息
@Override
public void run() {
subscribe(topics);
//noinspection InfiniteLoopStatement
while (true) {
final ConsumerRecords<String, Message> records = poll(Duration.of(5, ChronoUnit.SECONDS));
if (records == null || records.isEmpty()) {
continue;
}
log.info("Pausing the subscription");
pause(this.assignment());
log.info("Paused the subscription");
List<CompletableFuture<Void>> completableFutureList = new ArrayList<>();
records.forEach(record -> {
completableFutureList.add(CompletableFuture.runAsync(() -> {
try {
processor.read(record.value());
} catch (JsonProcessingException e) {
log.error("Could not process message", e);
}
}, executorService));
});
log.info("Waiting for completable futures to finish");
CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0])).join();
log.info("Completable Futures finished");
log.info("Resuming the subscription");
commitSync();
resume(this.assignment());
log.info("Resumed the subscription");
}
}
从dochere可以看出,poll()将继续被调用,并且在使用resume(Collection)恢复这些分区之前,它不会从这些分区返回任何记录。请注意,此方法不会影响导致分区重新平衡的分区订阅
在这方面,我的实现正确吗?我是阻止poll()还是poll()由另一个心跳线程处理
共 (0) 个答案