<p>我试着自己解决。我修改了一点以适应我的需要。这里有一个解决方法。在</p>
<pre><code>import io
import struct
from avro.io import BinaryDecoder, DatumReader
from confluent_kafka import Consumer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer import SerializerError
# Please adjust your server and url
# KAFKA BROKER URL
consumer = Consumer({
'bootstrap.servers': 'localhost:9021',
'group.id': 'abcd'
})
# SCHEMA URL
register_client = CachedSchemaRegistryClient(url="http://localhost:8081")
consumer.subscribe(['YOUR TOPIC'])
MAGIC_BYTES = 0
def unpack(payload):
magic, schema_id = struct.unpack('>bi', payload[:5])
# Get Schema registry
# Avro value format
if magic == MAGIC_BYTES:
schema = register_client.get_by_id(schema_id)
reader = DatumReader(schema)
output = BinaryDecoder(io.BytesIO(payload[5:]))
abc = reader.read(output)
return abc
# String key
else:
# Timestamp is inside my key
return payload[:-8].decode()
def get_data():
while True:
try:
msg = consumer.poll(10)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
raise SerializerError
if msg.error():
print("AvroConsumer error: {}".format(msg.error()))
return
key, value = unpack(msg.key()), unpack(msg.value())
print(key, value)
if __name__ == '__main__':
get_data()
</code></pre>
<p>要了解它发生的原因,可以在我的<a href="https://mlnotetaking.com/fixing-kafka-string-key-and-avro-value-python/" rel="nofollow noreferrer">blog</a>上阅读</p>