卡夫卡如何解读消费者的话题
我正试图找出我目前的高层次消费者正在进行哪些补偿。我使用Kafka 0.8.2.1,在服务器中设置了“offset.storage”(偏移量存储)。卡夫卡的属性——我认为,这意味着偏移量存储在卡夫卡中。(我还通过检查Zk shell中的这个路径来验证Zookeeper中没有存储偏移:/consumers/consumer_group_name/offsets/topic_name/partition_number
)
我试着听一下__consumer_offsets
主题,看看哪个消费者保存了多少补偿值,但它不起作用
我尝试了以下方法:
为控制台使用者创建了配置文件,如下所示:
=> more kafka_offset_consumer.config
exclude.internal.topics=false
并尝试了控制台使用者脚本的两个版本:
#1:
bin/kafka-console-consumer.sh --consumer.config kafka_offset_consumer.config --topic __consumer_offsets --zookeeper localhost:2181
#2
./bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 0 --broker-list localhost:9092 --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" --consumer.config kafka_offset_consumer.config
两者都不起作用-它只是放在那里,但不打印任何内容,即使消费者正在积极消费/节省补偿
我是否缺少一些其他配置/属性
谢谢
码头
# 1 楼答案
从卡夫卡0.11开始the (Scala) source code can be found here
对于那些需要Java翻译的用户,可以从任何消费进程获得一个
ConsumerRecord<byte[], byte[]> consumerRecord
,您可以使用获取密钥(首先检查密钥是否不是null)并使用
GroupMetadataManager.readMessageKey(consumerRecord.key)
。它可以返回不同的类型,所以选中if ( ... instanceof OffsetKey)
,然后强制转换它,您可以从中获得不同的值要获取偏移量的卡夫卡记录值,可以使用
String.valueOf(GroupMetadataManager.readOffsetMessageValue(consumerRecord.value))
从Scala代码翻译过来的最简单的Java示例
注意,也可以使用AdminClient API来描述组,而不是使用这些原始消息
Scala源代码提取
# 2 楼答案
我在尝试从\uu consumer\u offset主题消费时遇到了这个问题。 我设法为不同的卡夫卡版本找到了答案,并想与大家分享我的发现
对于卡夫卡0.8.2。x
注意:这使用Zookeeper连接
卡夫卡0.9。x、 x和0.10。x、 x
为0.11。x、 x-2。x
# 3 楼答案
如果您添加
--from-beginning
,它很可能会给您一些结果,至少在我自己尝试时是这样。或者,如果您没有提供该参数,但在消费者侦听时读取了更多消息(并触发了偏移量提交),那么也应该在那里显示消息# 4 楼答案
好的,我已经知道问题出在哪里了。我的卡夫卡实际上是使用Zookeeper作为偏移存储,而不是卡夫卡。。。。我没有立即检测到的原因是我错误地检查了ZK内容:
我在做什么
在那里什么也看不见。相反,我必须“获取”内容——这确实为我的消费者显示了正确的补偿,如下所示:
# 5 楼答案
卡夫卡2号。X使用下面的命令
kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"