使用Python从Kafka(Debezium)读取消息时出现CommitFailedError

2024-06-24 12:13:57 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在为Debezium消息构建Python Kafka消费者。 目前,我面临以下问题:

连续插入1000条记录时,Debezium会生成1000条更改消息。Python Kafka使用者开始逐个处理它们,但未能全部处理。事实上,它在消息650和750之间的某个位置停止,并在消息150左右再次开始。因此,它实际上陷入了一个循环中。消费者配置如下所示:

consumer = KafkaConsumer('topic', 
                        bootstrap_servers='',
                        auto_offset_reset='earliest', 
                        group_id='debezium', 
                        enable_auto_commit = True)

for message in consumer:
    # do something with the message

我怀疑消费者没有正确地提交其补偿。因此,我将配置更改为以下内容:

consumer = KafkaConsumer('topic', 
                        bootstrap_servers='',
                        auto_offset_reset='earliest', 
                        group_id='debezium', 
                        enable_auto_commit = False)

for message in consumer:
    # do something with the message

    consumer.commit()

现在,它正在提交它应该提交的is偏移量,并且不会陷入循环中。但是,它在消息650和750之间产生以下错误:

CommitFailedError: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max_poll_interval_ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the rebalance timeout with max_poll_interval_ms, or by reducing the maximum size of batches returned in poll() with max_poll_records.

有什么建议吗

提前感谢


Tags: kafkathein消息messageautoconsumerwith