我已经写了一个代码,它正在使用kafka主题中的日志,有时我发现应用程序无法使用日志,有时它工作正常
卡夫卡构型
Replication=3
Number of Partitions=42
Sum of partition offsets=0
Total number of Brokers=5
Number of Brokers for Topic=5
Preferred Replicas %=100
Brokers Skewed %=0
Brokers Leader Skewed %=0
Brokers Spread %=100
Under-replicated %=0
retention.ms=86400000
retention.bytes=102400
代码
brokerUri='localhost:2019'
topic='test_able'
def run(brokerUri,topic):
consumer = KafkaConsumer(bootstrap_servers=brokerUri, auto_offset_reset='earliest',
consumer_timeout_ms=10000, enable_auto_commit=False)
#,value_deserializer=lambda m: json.loads(m.decode('utf-8')))
consumer.subscribe([topic])
print(consumer)
for message in consumer:
print (message.value)
consumer.close()
run(brokerUri,topic)
另一方面,当不使用任何消息而不使用任何错误消息时,有时应用程序能够成功地使用日志
目前没有回答
相关问题 更多 >
编程相关推荐