java Kafka新使用者:(使用assign而非subscribe重新)设置和提交偏移量
使用新的Kafka Java consumer api,我运行一个消费者来消费消息。当所有可用的消息都被使用时,我用kill -15
终止它
现在我想重置偏移以开始。我希望避免只使用不同的消费群体。我尝试的是以下一系列调用,使用与刚刚读取完数据的消费者相同的组
assign(topicPartition);
OffsetAndMetadata om = new OffsetAndMetadata(0);
commitSync(Collections.singletonMap(topicPartition, 0));
我以为我在测试中得到了这个结果,但现在我总是得到:
ERROR internals.ConsumerCoordinator: Error UNKNOWN_MEMBER_ID occurred while committing offsets for group queue
Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552)
原则上把assign
和commitSync
结合起来是错误的吗?可能是因为只有subscribe
和commitSync
结合在一起?文档只说assign
不符合subscribe
,但我认为这只适用于一个消费者流程。(事实上,我甚至希望在另一个耗电元件启动时运行偏移重置耗电元件,希望另一个可能会注意到偏移量的变化并重新开始。但是先关闭它也可以。)
有什么想法吗
# 1 楼答案
发现了问题。鉴于我们尊重以下条件,我的问题中描述的方法效果良好:
可能没有其他消费者使用目标
group.id
运行。即使消费者只订阅了其他主题,这也会妨碍在调用assign()
而不是subscribe()
后提交主题偏移量在最后一个消费者停止后,需要30秒(我认为是^{)操作才能成功。来自卡夫卡的指示性日志消息是
一旦这出现在日志中,序列
你可以成功
# 2 楼答案
为什么一开始就要提交偏移量呢? 在
Properties
中将enable.auto.commit
设置为false
,如果只是在重启时重新读取所有消息,则根本不提交要重置偏移量,可以使用例如these methods: