有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

卡夫卡如何解读消费者的话题

我正试图找出我目前的高层次消费者正在进行哪些补偿。我使用Kafka 0.8.2.1,在服务器中设置了“offset.storage”(偏移量存储)。卡夫卡的属性——我认为,这意味着偏移量存储在卡夫卡中。(我还通过检查Zk shell中的这个路径来验证Zookeeper中没有存储偏移:/consumers/consumer_group_name/offsets/topic_name/partition_number

我试着听一下__consumer_offsets主题,看看哪个消费者保存了多少补偿值,但它不起作用

我尝试了以下方法:

为控制台使用者创建了配置文件,如下所示:

=> more kafka_offset_consumer.config 

 exclude.internal.topics=false

并尝试了控制台使用者脚本的两个版本:

#1:
bin/kafka-console-consumer.sh --consumer.config kafka_offset_consumer.config --topic __consumer_offsets --zookeeper localhost:2181

#2
./bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 0 --broker-list localhost:9092 --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" --consumer.config kafka_offset_consumer.config

两者都不起作用-它只是放在那里,但不打印任何内容,即使消费者正在积极消费/节省补偿

我是否缺少一些其他配置/属性

谢谢

码头


共 (5) 个答案

  1. # 1 楼答案

    从卡夫卡0.11开始the (Scala) source code can be found here

    对于那些需要Java翻译的用户,可以从任何消费进程获得一个ConsumerRecord<byte[], byte[]> consumerRecord,您可以使用

    1. 获取密钥(首先检查密钥是否不是null)并使用GroupMetadataManager.readMessageKey(consumerRecord.key)。它可以返回不同的类型,所以选中if ( ... instanceof OffsetKey),然后强制转换它,您可以从中获得不同的值

    2. 要获取偏移量的卡夫卡记录值,可以使用String.valueOf(GroupMetadataManager.readOffsetMessageValue(consumerRecord.value))

    从Scala代码翻译过来的最简单的Java示例

    byte[] key = consumerRecord.key;
    if (key != null) {
        Object o = GroupMetadataManager.readMessageKey(key);
        if (o != null && o instanceOf OffsetKey) {
            OffsetKey offsetKey = (OffsetKey) o;
            Object groupTopicPartition = offsetKey.key;
            byte[] value = consumerRecord.value;
            String formattedValue = String.valueOf(GroupMetadataManager.readOffsetMessageValue(value);
           // TODO: Print, store, or compute results with the new key and value 
        }
    }
    

    注意,也可以使用AdminClient API来描述组,而不是使用这些原始消息


    Scala源代码提取

    def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) {
      Option(consumerRecord.key).map(key => GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key))).foreach {
        // Only print if the message is an offset record.
        // We ignore the timestamp of the message because GroupMetadataMessage has its own timestamp.
        case offsetKey: OffsetKey =>
          val groupTopicPartition = offsetKey.key
          val value = consumerRecord.value
          val formattedValue =
            if (value == null) "NULL"
            else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
          output.write(groupTopicPartition.toString.getBytes(StandardCharsets.UTF_8))
          output.write("::".getBytes(StandardCharsets.UTF_8))
          output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
          output.write("\n".getBytes(StandardCharsets.UTF_8))
        case _ => // no-op
      }
    
  2. # 2 楼答案

    我在尝试从\uu consumer\u offset主题消费时遇到了这个问题。 我设法为不同的卡夫卡版本找到了答案,并想与大家分享我的发现

    对于卡夫卡0.8.2。x

    注意:这使用Zookeeper连接

    #Create consumer config
    echo "exclude.internal.topics=false" > /tmp/consumer.config
    #Consume all offsets
    ./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
    --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" \
    --zookeeper localhost:2181 --topic __consumer_offsets --from-beginning
    

    卡夫卡0.9。x、 x和0.10。x、 x

    #Create consumer config
    echo "exclude.internal.topics=false" > /tmp/consumer.config
    #Consume all offsets
    ./kafka-console-consumer.sh --new-consumer --consumer.config /tmp/consumer.config \
    --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" \
    --bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning
    

    为0.11。x、 x-2。x

    #Create consumer config
    echo "exclude.internal.topics=false" > /tmp/consumer.config
    #Consume all offsets
    ./kafka-console-consumer.sh --consumer.config /tmp/consumer.config \
    --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
    --bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning
    
  3. # 3 楼答案

    如果您添加--from-beginning,它很可能会给您一些结果,至少在我自己尝试时是这样。或者,如果您没有提供该参数,但在消费者侦听时读取了更多消息(并触发了偏移量提交),那么也应该在那里显示消息

  4. # 4 楼答案

    好的,我已经知道问题出在哪里了。我的卡夫卡实际上是使用Zookeeper作为偏移存储,而不是卡夫卡。。。。我没有立即检测到的原因是我错误地检查了ZK内容:

    我在做什么

    ls  /consumers/consumer_group_name/offsets/topic_name/partition_number
    

    在那里什么也看不见。相反,我必须“获取”内容——这确实为我的消费者显示了正确的补偿,如下所示:

    get /consumers/consumer_group_name/offsets/topic_name/partition_number 
    185530404
    cZxid = 0x70789ad05
    ctime = Mon Nov 23 17:49:46 GMT 2015
    mZxid = 0x7216cdc5c
    mtime = Thu Dec 03 20:18:57 GMT 2015
    pZxid = 0x70789ad05
    cversion = 0
    dataVersion = 3537384
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 9
    numChildren = 0
    
  5. # 5 楼答案

    卡夫卡2号。X使用下面的命令

    kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter"