有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java可靠地从Kafka主题获取最后一条(已经生成的)消息

我正在执行类似以下伪代码的操作

var consumer = new KafkaConsumer();
consumer.assign(topicPartitions);
var beginOff = consumer.beginningOffsets(topicPartitions);
var endOff = consumer.endOffsets(topicPartitions);
var lastOffsets = Math.max(beginOff, endOff - 1));
lastOffsets.forEach(consumer::seek);
lastMessages = consumer.poll(1 sec);
// do something with the received messages
consumer.close();

在我做的简单测试中,这是可行的,但我想知道是否有一些情况,比如生产者崩溃等,偏移量不是单调增加1?在这种情况下,我是否必须seek()及时返回,或者我是否可以从卡夫卡获取上一条已经生成的消息的消息偏移量

我没有使用事务,所以我们不需要担心读取已提交和未提交的消息

编辑:偏移不连续的示例是在日志压缩之后。但是,日志压缩应该始终保持最后一条消息,因为它显然比之前的所有消息(键是否相同)都更新。但理论上,最后一条消息之前的偏移量可以被压缩掉

Kafka Log Compaction


共 (1) 个答案

  1. # 1 楼答案

    kafka.apache.org/10/javadoc/中,明确提到,consumer.endOffsets

    Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.

    因此,当您获取endOff - 1时,它是您获取该主题分区的最后一条可用卡夫卡记录。因此,生产商的担忧不会因此受到影响

    还有一件事,补偿不是由制作人决定的。它由该主题分区的分区负责人决定。所以,它总是单调地增加1