<p>下面是代码片段,请确保在活动控制器中运行此代码段。
引导服务器是活动控制器IP。在</p>
<pre><code>client = KafkaAdminClient(bootstrap_servers=BOOTSTRAP_SERVERS, request_timeout_ms=300)
list_groups_request = client.list_consumer_groups()
for group in list_groups_request:
if group[1] == 'consumer':
list_mebers_in_groups = client.describe_consumer_groups([group[0]])
(error_code, group_id, state, protocol_type, protocol, members) = list_mebers_in_groups[0]
if len(members) !=0:
for member in members:
(member_id, client_id, client_host, member_metadata, member_assignment) = member
member_topics_assignment = []
for (topic, partitions) in MemberAssignment.decode(member_assignment).assignment:
member_topics_assignment.append(topic)
for topic in member_topics_assignment:
consumer = KafkaConsumer(
bootstrap_servers=BOOTSTRAP_SERVERS,
group_id=group[0],
enable_auto_commit=False
)
consumer.topics()
for p in consumer.partitions_for_topic(topic):
tp = TopicPartition(topic, p)
consumer.assign([tp])
committed = consumer.committed(tp)
consumer.seek_to_end(tp)
last_offset = consumer.position(tp)
if last_offset != None and committed != None:
lag = last_offset - committed
print "group: {} topic:{} partition: {} lag: {}".format(group[0], topic, p, lag)
</code></pre>