有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Kafka消费者仅在生成“足够”的数据后读取

我在spring boot中实现了一个端点,当调用它时,它将转储卡夫卡主题中的所有消息(用于测试)

我期望的行为是,当制作人向“testTopic”主题写信,然后消费者进行民意调查时,它应该阅读刚刚生成的消息

我观察到的行为是,消费者未能消费生成的信息。此外,如果制作人产生更多的消息(比如10-15条),那么消费者会一次性将所有消息转储。从这一点开始,如果生产者产生一条信息,那么消费者将按预期消费

直觉上,我认为设置FETCH_MIN_BYTES_CONFIG可能与此有关——也许消费者正在等待写入足够的字节。但这已经设置为1字节(默认值),并且不能解释后续成功的单个消息读取

接下来,我想我可能是在创建主题之前注册了消费者(通过太快地调用注册端点)。但我从kafka-topics.sh确认,在注册消费者之前,该主题已经存在

我注意到,如果我启用了自动提交偏移量,那么行为有时与预期相符,有时则与预期不符。对于手动提交偏移量(下面的代码中没有显示),如上所述,这种行为非常奇怪

我还知道制作人通过使用kafka-console-consumer确认它正在按预期工作

还尝试将轮询超时时间增加到1秒,但没有成功

// Consumer
@Component
public class TestConsumer{
    private KafkaConsumer testConsumer = null;

    public void registerConsumer(final String consumerId) {
        if (consumer == null) {
            Properties props = new Properties();
            props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<some_address>:<some_port>");
            props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");

            testConsumer = new KafkaConsumer<String, String>(props);
            testConsumer.subscribe(Collections.singletonList("testTopic"));
        }
        else{
            logger.debug("Consumer already registered");
        }
    }

    public Map<String, List<String>> consume() {
        Map<String, List<String>> messages = new HashMap<>();
        if (testConsumer == null){
            logger.error("testConsumer was not instantiated");
            return null;
        }
        ConsumerRecords<String, String> records = testConsumer.poll(Duration.ofMillis(100));
        List<String> buffer = new ArrayList<>(); 
        for (ConsumerRecord<String, String> record: records){
            logger.debug(String.format("Consuming %s", record.value()));
            buffer.add(record.value());
        }
        messages.put("data", buffer);
        return messages;
    }
}

步骤顺序如下: 1.spring boot应用程序启动 2.卡夫卡主题已创建,我可以通过卡夫卡控制台确认 3.我登记生产者和消费者 4.制作人生产,我可以通过卡夫卡控制台(不同的消费者群体)确认这一点 5.消费者未能消费

我预计结果如下:

{
    "data" : ["message1"]
}

我得到的是

{
    "data" : []
}

你知道为什么消费者在写入一定数量的消息之前不消费记录吗

编辑1: 向消费者添加了props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");属性,但没有效果


共 (1) 个答案

  1. # 1 楼答案

    当您手动调用此testConsumer.poll(Duration.ofMillis(100))。你需要不断地集中精力讨论这个话题。就像在一个无限的while循环中。例如:

    while (true) {
       Map records = consume();
       logger.debug("received records: {}", records);
    }
    

    看看这个链接:Kafka consumer