java可靠地从Kafka主题获取最后一条(已经生成的)消息
我正在执行类似以下伪代码的操作
var consumer = new KafkaConsumer();
consumer.assign(topicPartitions);
var beginOff = consumer.beginningOffsets(topicPartitions);
var endOff = consumer.endOffsets(topicPartitions);
var lastOffsets = Math.max(beginOff, endOff - 1));
lastOffsets.forEach(consumer::seek);
lastMessages = consumer.poll(1 sec);
// do something with the received messages
consumer.close();
在我做的简单测试中,这是可行的,但我想知道是否有一些情况,比如生产者崩溃等,偏移量不是单调增加1?在这种情况下,我是否必须seek()
及时返回,或者我是否可以从卡夫卡获取上一条已经生成的消息的消息偏移量强>
我没有使用事务,所以我们不需要担心读取已提交和未提交的消息
编辑:偏移不连续的示例是在日志压缩之后。但是,日志压缩应该始终保持最后一条消息,因为它显然比之前的所有消息(键是否相同)都更新。但理论上,最后一条消息之前的偏移量可以被压缩掉
# 1 楼答案
在kafka.apache.org/10/javadoc/中,明确提到,consumer.endOffsets
Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.
因此,当您获取
endOff - 1
时,它是您获取该主题分区的最后一条可用卡夫卡记录。因此,生产商的担忧不会因此受到影响还有一件事,补偿不是由制作人决定的。它由该主题分区的分区负责人决定。所以,它总是单调地增加1