无法使用汇合Kafkapython从Kafka主题消费到新的consumergroup

2024-10-01 17:28:59 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图使用融合的Kafka python从一个Kafka主题消费到一个全新的消费群体(下面显示的示例中的group_NAME_CONNECT5)。它似乎不起作用,除非我首先使用卡夫卡控制台消费者使用这个新的消费群体!在我仅使用kafka控制台使用者一次之后,融合的kafka python使用者工作正常!!知道为什么吗

特性:

[卡夫卡]

引导\u服务器=

ssl\u ca\u位置=

最大等待周期=30

组名称=组名称连接5

客户号=客户号

自动提交间隔时间=5000

代码:

kafkaConsumerConfig = {
    'bootstrap.servers': config.BOOTSTRAP_SERVERS,
    'group.id': config.GROUP_NAME,
    'client.id': config.CLIENT_ID,
    'session.timeout.ms': 60000,
    'heartbeat.interval.ms': 3000,

    'security.protocol': 'SASL_SSL',
    'sasl.kerberos.service.name': 'kafka',

    'sasl.mechanisms': 'GSSAPI',
    'ssl.ca.location': config.SSL_CA_LOCATION,
    'sasl.kerberos.kinit.cmd': 'kinit -S {0} {1} -k -t {2}'.format(config.KEYTAB_PRINCIPAL, config.KEYTAB_USER, config.KEYTAB_PATH),
    'default.topic.config': {
        'enable.auto.commit': 'false',
        'enable.auto.offset.store': 'false'
    }

c = Consumer(**self.kafkaConsumerConfig)

c.subscribe([self.TOPIC_NAME])

while True:
    kafka_msg = c.poll(1.0)
    process_message(kafka_msg)

日志:在运行Python使用者5分钟后(然后杀死它)。Python使用者无法使用任何消息

%7 | 1579623452.300 |初始化|客户端ID#消费者-1 |[thrd:app]:librdkafka v1.2.1(0x10201ff)客户端ID#消费者-1已初始化(内置功能gzip、snappy、ssl、sasl、regex、lz4、sasl_gssapi、sasl_平原、sasl_紧急停堆、插件、sasl_OAuthBear、GCC GXX PKGCONFIG安装GNULD LDS LIBDL插件ZLIB ssl sasl_CYRUS HDRHISTOGRAM snappy SOCKEM sasl_紧急停堆sasl_OAuthBear CRC32C_HW,调试0x2000) %7 | 1579623452.303 |订阅|客户| ID#消费者-1 |[thrd:main]:组“组名|连接5”:订阅1个主题的新订阅(加入状态初始) %7 | 1579623452.303 |重新平衡|客户ID |消费者-1 |[thrd:main]:组“组名称|连接5”正在状态init(加入状态init)中重新平衡,无分配:取消订阅 %7 | 1579623453.441 |加入|客户| ID#消费者-1 |[thrd:main]:组“组名|连接5”:推迟加入,直到有最新元数据可用 %7 | 1579623453.443 |重新加入|客户ID |消费者-1 |[thrd:main]:组“组名称|连接5”:根据元数据更改更新的订阅:重新加入组 %7 | 1579623453.443 |重新平衡|客户| ID |消费者-1 |[thrd:main]:组“组|名称|连接5”正在向上状态(连接状态初始化)下重新平衡,无需分配:组重新连接 %7 | 1579623455.300 |加入|客户ID#消费者-1 |[thrd:main]:sasl_ssl://xxxx/159: 加入组“组名连接5”并订阅1个主题 %7 | 1579623458.305 |转让人|客户| ID |消费者-1 |[thrd:main]:组“组名称”|连接5:“范围”转让人为1个成员运行 %7 | 1579623458.309 |分配|客户机ID |消费者-1 |[thrd:main]:组“组名称(U连接5):新分配连接状态下的8个分区等待-分配-再平衡| %7 | 1579623458.309 |偏移量|客户端| ID#消费者-1 |[thrd:main]:GroupCoordinator/159:获取8/8分区的已提交偏移量 %7 | 1579623458.312 |取数|客户ID |消费者-1 |[thrd:main]:分区cheu silo _cnsld _rpt |[2]在偏移量123698处开始取数 %7 | 1579623458.313 |取数|客户ID |消费者-1 |[thrd:main]:分区cheu silo _cnsld _rpt |[7]在偏移量116555处开始取数 %7 | 1579623458.465 |取数|客户ID |消费者-1 |[thrd:main]:分区cheu silo _cnsld _rpt |[4]在偏移量106800处开始取数 %7 | 1579623458.484 |取数|客户ID |消费者-1 |[thrd:main]:分区cheu silo _cnsld _rpt |[1]在偏移量107557处开始取数 %7 | 1579623458.485 |取数|客户ID |消费者-1 |[thrd:main]:分区cheu silo _cnsld _rpt |[6]在偏移量109805处开始取数 %7 | 1579623458.486 |取数|客户ID |消费者-1 |[thrd:main]:分区che|u silo_cnsld_rpt|mthly[0]在偏移量91465处开始取数 %7 | 1579623458.487 |取数|客户ID |消费者-1 |[thrd:main]:分区cheu silo _cnsld _rpt |[3]在偏移量102042处开始取数 %7 | 1579623458.487 |取数|客户ID |消费者-1 |[thrd:main]:分区cheu silo _cnsld _rpt |[5]在偏移量117214处开始取数

卡夫卡消费团体

../bin/kafka-consumer-groups.sh--引导服务器xxx--命令配置producer.properties--组组\u NAME\u CONNECT5--描述

消费者组“组名连接5”没有活动成员

然后,我使用相同的新使用者组(group_NAME_CONNECT5)运行kafka控制台使用者。它使用主题中的所有消息

../bin/kafka-consumer-groups.sh--引导服务器xxx--命令配置producer.properties--组组\u NAME\u CONNECT5--描述 OpenJDK 64位服务器VM警告:如果预期处理器数量将从一个增加,则应使用-XX:ParallelGCThreads=N适当配置并行GC线程的数量 消费者组“组名连接5”没有活动成员

主题分区当前偏移量日志结束偏移量滞后消费者ID主机客户端ID che_silo_cnsld_rpt_mthly 1 112644 112644 0--- che_silo_cnsld_rpt_mthly 5 123099 123099 0--- che_silo_cnsld_rpt_mthly 0 95715 95715 0--- che_silo_cnsld_rpt_mthly 3 106932 106932 0--- che_silo_cnsld_rpt_mthly 4 112588 112588 0--- che_silo_cnsld_rpt_mthly 7 122047 122047 0--- che_silo_cnsld_rpt_mthly 2 129940 129940 0--- che_silo_cnsld_rpt_mthly 6 115050 115050 0--

然后,我在主题中生成一些新消息,并运行python消费者…而且,这次python消费者成功运行

%7 | 1579624630.644 |初始化|客户端ID |消费者-1 |[thrd:app]:librdkafka v1.2.1(0x10201ff)客户端ID |消费者-1已初始化(内置功能gzip、snappy、ssl、sasl、regex、lz4、sasl_gssapi、sasl_平原、sasl_紧急停堆、插件、sasl_OAuthBear、GCC GXX PKGCONFIG安装GNULD LDS LIBDL插件ZLIB ssl sasl_CYRUS HDRHISTOGRAM snappy SOCKEM sasl_紧急停堆sasl_OAuthBear CRC32C_HW,调试0x2000) %7 | 1579624630.648 |订阅|客户ID |消费者-1 |[thrd:main]:组“组名|连接5”:订阅1个主题的新订阅(加入状态初始) %7 | 1579624630.648 |重新平衡|客户| ID |消费者-1 |[thrd:main]:组“组|名称|连接5”正在状态init(加入状态init)中重新平衡,无分配:取消订阅 %7 | 1579624631.807 |加入|客户| ID |消费者-1 |[thrd:main]:组“组名|连接5”:推迟加入,直到有最新元数据可用 %7 | 1579624631.808 |重新加入|客户ID |消费者-1 |[thrd:main]:组“组名称|连接5”:根据元数据更改更新的订阅:重新加入组 %7 | 1579624631.808 |重新平衡|客户| ID |消费者-1 |[thrd:main]:组“组名|连接5”正在状态上升(连接状态初始)下重新平衡,无需分配:组重新连接 %7 | 1579624633.644 |加入|客户ID |消费者-1 |[thrd:main]:sasl_ssl://cilhdkfs0304.sys.cigna.com:9095/159: 加入组“组名连接5”并订阅1个主题 %7 | 1579624636.650 |转让人|客户| ID |消费者-1 |[thrd:main]:组“组名称”|连接5:“范围”转让人为1个成员运行 %7 | 1579624636.654 |分配|客户端| ID |消费者-1 |[thrd:main]:组“组名|连接5”:新分配连接状态下的8个分区等待-分配-再平衡| %7 | 1579624636.654 |偏移量|客户机ID |消费者-1 |[thrd:main]:GroupCoordinator/159:获取8/8分区的已提交偏移量 %7 | 1579624636.656 |取数|客户ID |消费者-1 |[thrd:main]:分区che|u silo_cnsld_rpt|m[0]在偏移量91465处开始取数 %7 | 1579624636.656 |取数|客户ID |消费者-1 |[thrd:main]:分区che|u silo_cnsld_rpt|[1]在偏移量107557处开始取数 %7 | 1579624636.656 |取数|客户ID |消费者-1 |[thrd:main]:分区che|u silo_cnsld_rpt|[2]在偏移量123698处开始取数 %7 | 1579624636.656 |取数|客户ID |消费者-1 |[thrd:main]:分区che|u silo_cnsld_rpt|[3]在偏移量102042处开始取数 %7 | 1579624636.656 |获取|客户ID |消费者-1 |[thrd:main]:分区che|u silo_cnsld|u rpt|[4]开始获取g偏移量106800处 %7 | 1579624636.656 |取数|客户ID |消费者-1 |[thrd:main]:分区che|u silo_cnsld_rpt|[5]在偏移量117214处开始取数 %7 | 1579624636.656 |取数|客户ID |消费者-1 |[thrd:main]:分区cheu silo_cnsld_rpt|[6]在偏移量109805处开始取数 %7 | 1579624636.656 |取数|客户ID |消费者-1 |[thrd:main]:分区che|u silo_cnsld_rpt|[7]在偏移量116555处开始取数


Tags: 名称id客户main状态消费者che偏移量
1条回答
网友
1楼 · 发布于 2024-10-01 17:28:59

我猜您的问题与“自动.偏移.重置”配置有关。当消费者加入集群时,此配置定义消费者将从哪个偏移开始消费。试着放:

"auto.offset.reset": "earliest"

此配置将使您的消费者从第一个(最早的)偏移量/消息开始消费。默认配置为“最大”,这意味着当主题上有新消息可用时,消费者将开始消费。查看this了解更多详细信息

相关问题 更多 >

    热门问题