当使用kafka主题中的日志时,应用程序有时无法使用该日志

2024-10-01 00:34:57 发布

您现在位置:Python中文网/ 问答频道 /正文

我已经写了一个代码,它正在使用kafka主题中的日志,有时我发现应用程序无法使用日志,有时它工作正常

卡夫卡构型

Replication=3
Number of Partitions=42
Sum of partition offsets=0
Total number of Brokers=5
Number of Brokers for Topic=5
Preferred Replicas %=100
Brokers Skewed %=0
Brokers Leader Skewed %=0
Brokers Spread %=100
Under-replicated %=0

retention.ms=86400000
retention.bytes=102400

代码

brokerUri='localhost:2019'
topic='test_able'
def run(brokerUri,topic):

        consumer = KafkaConsumer(bootstrap_servers=brokerUri, auto_offset_reset='earliest',
                                 consumer_timeout_ms=10000, enable_auto_commit=False)
                                 #,value_deserializer=lambda m: json.loads(m.decode('utf-8')))

        consumer.subscribe([topic])
        print(consumer)
        for message in consumer:
            print (message.value)

        consumer.close()

run(brokerUri,topic)

另一方面,当不使用任何消息而不使用任何错误消息时,有时应用程序能够成功地使用日志


Tags: ofrun代码应用程序numberforautotopic