有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java这是使用kafka consumer的pause()和resume()的正确方法吗?

我正在尝试使用Kafka Consumer Interface中的pause()和resume()API在Kafka消费流中实现背压。我的想法是暂停()消费,直到处理当前轮询的消息

    @Override
    public void run() {
        subscribe(topics);
        //noinspection InfiniteLoopStatement
        while (true) {
            final ConsumerRecords<String, Message> records = poll(Duration.of(5, ChronoUnit.SECONDS));
            if (records == null || records.isEmpty()) {
                continue;
            }
            log.info("Pausing the subscription");
            pause(this.assignment());
            log.info("Paused the subscription");
            List<CompletableFuture<Void>> completableFutureList = new ArrayList<>();
            records.forEach(record -> {
                completableFutureList.add(CompletableFuture.runAsync(() -> {
                    try {
                        processor.read(record.value());
                    } catch (JsonProcessingException e) {
                        log.error("Could not process message", e);
                    }
                }, executorService));
            });
            log.info("Waiting for completable futures to finish");
            CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0])).join();
            log.info("Completable Futures finished");
            log.info("Resuming the subscription");
            commitSync();
            resume(this.assignment());
            log.info("Resumed the subscription");
        }
    }

从dochere可以看出,poll()将继续被调用,并且在使用resume(Collection)恢复这些分区之前,它不会从这些分区返回任何记录。请注意,此方法不会影响导致分区重新平衡的分区订阅

在这方面,我的实现正确吗?我是阻止poll()还是poll()由另一个心跳线程处理


共 (0) 个答案