java Kafka消费者仅在生成“足够”的数据后读取
我在spring boot中实现了一个端点,当调用它时,它将转储卡夫卡主题中的所有消息(用于测试)
我期望的行为是,当制作人向“testTopic”主题写信,然后消费者进行民意调查时,它应该阅读刚刚生成的消息
我观察到的行为是,消费者未能消费生成的信息。此外,如果制作人产生更多的消息(比如10-15条),那么消费者会一次性将所有消息转储。从这一点开始,如果生产者产生一条信息,那么消费者将按预期消费
直觉上,我认为设置FETCH_MIN_BYTES_CONFIG
可能与此有关——也许消费者正在等待写入足够的字节。但这已经设置为1字节(默认值),并且不能解释后续成功的单个消息读取
接下来,我想我可能是在创建主题之前注册了消费者(通过太快地调用注册端点)。但我从kafka-topics.sh
确认,在注册消费者之前,该主题已经存在
我注意到,如果我启用了自动提交偏移量,那么行为有时与预期相符,有时则与预期不符。对于手动提交偏移量(下面的代码中没有显示),如上所述,这种行为非常奇怪
我还知道制作人通过使用kafka-console-consumer
确认它正在按预期工作
还尝试将轮询超时时间增加到1秒,但没有成功
// Consumer
@Component
public class TestConsumer{
private KafkaConsumer testConsumer = null;
public void registerConsumer(final String consumerId) {
if (consumer == null) {
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<some_address>:<some_port>");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
testConsumer = new KafkaConsumer<String, String>(props);
testConsumer.subscribe(Collections.singletonList("testTopic"));
}
else{
logger.debug("Consumer already registered");
}
}
public Map<String, List<String>> consume() {
Map<String, List<String>> messages = new HashMap<>();
if (testConsumer == null){
logger.error("testConsumer was not instantiated");
return null;
}
ConsumerRecords<String, String> records = testConsumer.poll(Duration.ofMillis(100));
List<String> buffer = new ArrayList<>();
for (ConsumerRecord<String, String> record: records){
logger.debug(String.format("Consuming %s", record.value()));
buffer.add(record.value());
}
messages.put("data", buffer);
return messages;
}
}
步骤顺序如下: 1.spring boot应用程序启动 2.卡夫卡主题已创建,我可以通过卡夫卡控制台确认 3.我登记生产者和消费者 4.制作人生产,我可以通过卡夫卡控制台(不同的消费者群体)确认这一点 5.消费者未能消费
我预计结果如下:
{
"data" : ["message1"]
}
我得到的是
{
"data" : []
}
你知道为什么消费者在写入一定数量的消息之前不消费记录吗
编辑1:
向消费者添加了props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
属性,但没有效果
# 1 楼答案
当您手动调用此
testConsumer.poll(Duration.ofMillis(100))
。你需要不断地集中精力讨论这个话题。就像在一个无限的while循环中。例如:看看这个链接:Kafka consumer