有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Kafka新使用者:(使用assign而非subscribe重新)设置和提交偏移量

使用新的Kafka Java consumer api,我运行一个消费者来消费消息。当所有可用的消息都被使用时,我用kill -15终止它

现在我想重置偏移以开始。我希望避免只使用不同的消费群体。我尝试的是以下一系列调用,使用与刚刚读取完数据的消费者相同的组

assign(topicPartition);
OffsetAndMetadata om = new OffsetAndMetadata(0);
commitSync(Collections.singletonMap(topicPartition, 0));

我以为我在测试中得到了这个结果,但现在我总是得到:

ERROR internals.ConsumerCoordinator: Error UNKNOWN_MEMBER_ID occurred while committing offsets for group queue
Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552)

原则上把assigncommitSync结合起来是错误的吗?可能是因为只有subscribecommitSync结合在一起?文档只说assign不符合subscribe,但我认为这只适用于一个消费者流程。(事实上,我甚至希望在另一个耗电元件启动时运行偏移重置耗电元件,希望另一个可能会注意到偏移量的变化并重新开始。但是先关闭它也可以。)

有什么想法吗


共 (2) 个答案

  1. # 1 楼答案

    发现了问题。鉴于我们尊重以下条件,我的问题中描述的方法效果良好:

    1. 可能没有其他消费者使用目标group.id运行。即使消费者只订阅了其他主题,这也会妨碍在调用assign()而不是subscribe()后提交主题偏移量

    2. 在最后一个消费者停止后,需要30秒(我认为是^{)操作才能成功。来自卡夫卡的指示性日志消息是

      Group X generation Y is dead and removed
      

    一旦这出现在日志中,序列

    assign(topicPartition);
    OffsetAndMetadata om = new OffsetAndMetadata(0);
    commitSync(Collections.singletonMap(topicPartition, 0));
    

    你可以成功

  2. # 2 楼答案

    为什么一开始就要提交偏移量呢? 在Properties中将enable.auto.commit设置为false,如果只是在重启时重新读取所有消息,则根本不提交

    要重置偏移量,可以使用例如these methods

    public void seek(TopicPartition partition, long offset)
    public void seekToBeginning(TopicPartition... partitions)