卡夫卡消费者:如何在Python中读取特定的Avro字段?

2024-09-29 21:28:54 发布

您现在位置:Python中文网/ 问答频道 /正文

在下面消费者的片段中,我能够接收发送的数据。如何从要使用的整个数据中访问特定值

from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import (SerializerError,
                                             KeySerializerError,
                                             ValueSerializerError)

***
***
***

c.subscribe(['Topic'])

while True:
    try:
        msg = c.poll(10)
        print(msg)

谢谢


Tags: kafka数据fromimport消费者msgsubscribeavro
2条回答

实际上有两种方法可以实现这一点:

msg.value()['myFieldName']

msg.value().get('myFieldName')

比如说,

c = AvroConsumer({
    'bootstrap.servers': 'localhost:9092',
    'schema.registry.url': 'localhost:8081',
    'group.id': 'test-group'
})


c.subscribe(['Topic'])

while True:
    try:
        msg = c.poll(10)

        if msg:
            print(f"field1 Value: {msg.value()['field1']}")
            print(f"field2 Value: {msg.value().get('field2')}")

        else: 
            pass
    except SerializerError as e:
        print(f"Message deserialization failed for message {msg}:\n{e}")

我看到你正在导入AvroConsumer,所以你应该

c.value()['field'] 

相关问题 更多 >

    热门问题