使用邮件时AIOKafka库中的UnknownMemberId错误

2024-10-03 15:24:01 发布

您现在位置:Python中文网/ 问答频道 /正文

我在Python中的AIOKafka库(最后的版本)中遇到了一个错误。基本上,我接收到一条失败的心跳消息,然后无法执行偏移量的提交。这是日志:

Heartbeat failed for group my-group-dag-kafka because it is rebalancing
Heartbeat failed: local member_id was not recognized; resetting and re-joining group
Heartbeat session expired - marking coordinator dead
Marking the coordinator dead (node 1)for group my-group-dag-kafka.
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
OffsetCommit failed for group my-group-dag-kafka due to group error ([Error 25] UnknownMemberIdError: my-group-dag-kafka), will rejoin
Auto offset commit failed: [Error 25] UnknownMemberIdError: my-group-dag-kafka
Traceback (most recent call last):
  File "/app/manage.py", line 23, in <module>
    server.serve()
  File "/usr/local/lib/python3.7/site-packages/libbase/base.py", line 146, in serve
    self._start_mq()
  File "/usr/local/lib/python3.7/site-packages/libbase/base.py", line 186, in _start_mq
    self.loop.run_until_complete(self.async_mq_loop())
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.7/site-packages/libbase/base.py", line 202, in async_mq_loop
    async for message in self.consumer:
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 1220, in __anext__
    return (yield from self.getone())
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/consumer.py", line 1101, in getone
    msg = yield from self._fetcher.next_record(partitions)
  File "/usr/local/lib/python3.7/site-packages/aiokafka/consumer/fetcher.py", line 1051, in next_record
    yield from waiter
kafka.errors.UnknownMemberIdError: [Error 25] UnknownMemberIdError: my-group-dag-kafka
Unclosed AIOKafkaConsumer
consumer: <aiokafka.consumer.consumer.AIOKafkaConsumer object at 0x7fd58b49de10>
Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7fd58bd9c4d0>
Task was destroyed but it is pending!
task: <Task pending coro=<Sender._sender_routine() running at /usr/local/lib/python3.7/site-packages/aiokafka/producer/sender.py:151> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fd58ab91ad0>()]> cb=[Sender._fail_all()]>

python版本为3.7.4,AIOKafka版本为0.5.2,使用者配置为

self.consumer = AIOKafkaConsumer(
            self.config.KAFKA_TOPIC,
            loop=self.loop,
            bootstrap_servers=self.config.KAFKA_URL,
            group_id=self.config.KAFKA_GROUP_ID,
            fetch_min_bytes=100000)

错误是自发发生的。我试图改变这个配置

heartbeat.interval.ms
session.timeout.ms
max.poll.interval.ms

但我相信标准值应该是可以的。这个错误是怎么说的?消费者不再是一个群体的一部分,而这个循环刚刚结束?。顺便说一句,这个错误发生后,容器就死了


        async for message in self.consumer:
            try:
                asyncio.create_task(self.async_handle_message(message))
            except Exception as e:
                import traceback

                traceback.print_exc()
                print('Exception: ', e, flush=True)

任何调试问题或优化配置的指导都会得到很好的接受。祝你今天愉快:)


Tags: kafkatoselfformygrouperrorwill