卡夫卡:信息不是从魔法开始的

2024-09-30 06:16:44 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图使用KSQL从处理过的主题中获取数据。但是,它没有起作用。在

我使用KSQL设置了一个名为api_table的表。这是我的表的详细资料。在

ksql> show topics;

 Kafka Topic   | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups 
-------------------------------------------------------------------------------------------
 _schemas      | false      | 1          | 1                  | 0         | 0              
 api_log       | true       | 1          | 1                  | 1         | 1              
 API_STREAM    | false      | 1          | 1                  | 0         | 0              
 API_STREAM_KEY| true       | 1          | 1                  | 1         | 1              
 API_TABLE     | true       | 1          | 1                  | 0         | 0              
 mysql-config  | false      | 1          | 1                  | 0         | 0              
 mysql-offsets | false      | 25         | 1                  | 0         | 0              
 mysql-status  | false      | 5          | 1                  | 0         | 0              
-------------------------------------------------------------------------------------------

这是我的表格格式。在

^{pr2}$

一切都很好,我甚至可以把留言打印出来。在

但是,如果我尝试使用python来使用消息。在

from confluent_kafka import KafkaError
import io
from confluent_kafka import Consumer
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError


consumer = AvroConsumer({
    'bootstrap.servers': 'localhost:9021',
    'schema.registry.url': 'http://localhost:8081',
    'group.id': 'abcd'
})

consumer.subscribe(['API_TABLE'])

while True:
    try:
        msg = consumer.poll(10)
    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break

    if msg is None:
        continue

    if msg.error():
        print("AvroConsumer error: {}".format(msg.error()))
        continue

    print(msg.value())

consumer.close()

它显示了这个错误。为什么?在

Traceback (most recent call last):
  File "/Users/ylee/PycharmProjects/Bokeh/env/lib/python3.7/site-packages/confluent_kafka/avro/__init__.py", line 149, in poll
    decoded_key = self._serializer.decode_message(message.key(), is_key=True)
  File "/Users/ylee/PycharmProjects/Bokeh/env/lib/python3.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 225, in decode_message
    raise SerializerError("message does not start with magic byte")
confluent_kafka.avro.serializer.SerializerError: message does not start with magic byte

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/ylee/PycharmProjects/Bokeh/consumer.py", line 18, in <module>
    msg = consumer.poll(10)
  File "/Users/ylee/PycharmProjects/Bokeh/env/lib/python3.7/site-packages/confluent_kafka/avro/__init__.py", line 156, in poll
    e))
confluent_kafka.avro.serializer.SerializerError: Message deserialization failed for message at API_TABLE [0] offset 110: message does not start with magic byte

Tags: kafkafromimportapifalsemessageconsumermsg
3条回答

Confluentic AVRO不支持字节类型?因此,某些假定格式的库无法读取该格式。在

https://github.com/confluentinc/ksql/issues/1282您可以使用合并库对https://github.com/confluentinc/confluent-kafka-python进行编码/解码来解决这个问题

问题是,当KSQL将写成Avro时,STRING。在

这个问题看起来是一样的,有一个建议的PR来修复:https://github.com/confluentinc/confluent-kafka-python/pull/650

我试着自己解决。我修改了一点以适应我的需要。这里有一个解决方法。在

import io
import struct

from avro.io import BinaryDecoder, DatumReader
from confluent_kafka import Consumer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer import SerializerError

# Please adjust your server and url

# KAFKA BROKER URL
consumer = Consumer({
    'bootstrap.servers': 'localhost:9021',
    'group.id': 'abcd'
})

# SCHEMA URL 
register_client = CachedSchemaRegistryClient(url="http://localhost:8081")
consumer.subscribe(['YOUR TOPIC'])

MAGIC_BYTES = 0


def unpack(payload):
    magic, schema_id = struct.unpack('>bi', payload[:5])

    # Get Schema registry
    # Avro value format
    if magic == MAGIC_BYTES:
        schema = register_client.get_by_id(schema_id)
        reader = DatumReader(schema)
        output = BinaryDecoder(io.BytesIO(payload[5:]))
        abc = reader.read(output)
        return abc
    # String key
    else:
        # Timestamp is inside my key
        return payload[:-8].decode()


def get_data():
    while True:
        try:
            msg = consumer.poll(10)
        except SerializerError as e:
            print("Message deserialization failed for {}: {}".format(msg, e))
            raise SerializerError

        if msg.error():
            print("AvroConsumer error: {}".format(msg.error()))
            return

        key, value = unpack(msg.key()), unpack(msg.value())
        print(key, value)

if __name__ == '__main__':
    get_data()

要了解它发生的原因,可以在我的blog上阅读

相关问题 更多 >

    热门问题