使用kafka 0.8.2.0跟踪主题大小和用户延迟

2024-10-01 17:33:22 发布

您现在位置:Python中文网/ 问答频道 /正文

自从kafka0.8.2.0以来,追踪消费者的滞后和话题的大小似乎变得非常困难

你如何跟踪卡夫卡的偏移量(主题大小)和滞后?您是否在生产者插入消息时在某处增加一个计数器,而在消费者确认消息时增加另一个计数器?在

{1{1}你为什么总是用这些度量来做呢?在

我们的消费者和生产者是用python编写的,使用kafka-python,他们声明他们不支持ConsumerCoordinator偏移API,所以我设计了一个解决方案,查询zookeeper并将这些指标发送到statsd实例(看起来很尴尬),但我仍然缺少主题大小度量。在

我们使用collectd来收集系统指标,我没有JMX的经验,在collectd中配置它似乎很复杂,我尝试了几次,所以我找到了一些方法来避免它。在

如果你有任何输入,我很乐意听到,即使是:“这属于x stackexchange网站”


Tags: kafkaapi声明消息主题度量计数器消费者
3条回答

如果我没听错,你可以用FetchResponse中的HighwaterMarkOffset。这样您就可以知道分区末尾的偏移量是多少,并可以将其与当前确认的偏移量或此FetchResponse中最后一条消息的偏移量进行比较。在

详细信息here

下面是代码片段,请确保在活动控制器中运行此代码段。 引导服务器是活动控制器IP。在

client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS, request_timeout_ms=300)
      list_groups_request  = client.list_consumer_groups()

      for group in list_groups_request:
        if group[1] == 'consumer':
          list_mebers_in_groups = client.describe_consumer_groups([group[0]])
          (error_code, group_id, state, protocol_type, protocol, members) = list_mebers_in_groups[0]

          if len(members) !=0:
            for member in members:
              (member_id, client_id, client_host, member_metadata, member_assignment) = member
              member_topics_assignment = []
              for (topic, partitions) in MemberAssignment.decode(member_assignment).assignment:
                member_topics_assignment.append(topic)

              for topic in member_topics_assignment:
                consumer = KafkaConsumer(
                          bootstrap_servers=BOOTSTRAP_SERVERS,
                          group_id=group[0],
                          enable_auto_commit=False
                          )
                consumer.topics()

                for p in consumer.partitions_for_topic(topic):
                  tp = TopicPartition(topic, p)
                  consumer.assign([tp])
                  committed = consumer.committed(tp)
                  consumer.seek_to_end(tp)
                  last_offset = consumer.position(tp)
                  if last_offset != None and committed != None:
                    lag = last_offset - committed
                    print "group: {} topic:{} partition: {} lag: {}".format(group[0], topic, p, lag)

您是否尝试过使用https://github.com/quantifind/KafkaOffsetMonitor监视使用者延迟。它适用于0.8.2.0

相关问题 更多 >

    热门问题