我试图使用KSQL从处理过的主题中获取数据。但是,它没有起作用。在
我使用KSQL设置了一个名为api_table的表。这是我的表的详细资料。在
ksql> show topics;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
-------------------------------------------------------------------------------------------
_schemas | false | 1 | 1 | 0 | 0
api_log | true | 1 | 1 | 1 | 1
API_STREAM | false | 1 | 1 | 0 | 0
API_STREAM_KEY| true | 1 | 1 | 1 | 1
API_TABLE | true | 1 | 1 | 0 | 0
mysql-config | false | 1 | 1 | 0 | 0
mysql-offsets | false | 25 | 1 | 0 | 0
mysql-status | false | 5 | 1 | 0 | 0
-------------------------------------------------------------------------------------------
这是我的表格格式。在
^{pr2}$一切都很好,我甚至可以把留言打印出来。在
但是,如果我尝试使用python来使用消息。在
from confluent_kafka import KafkaError
import io
from confluent_kafka import Consumer
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
consumer = AvroConsumer({
'bootstrap.servers': 'localhost:9021',
'schema.registry.url': 'http://localhost:8081',
'group.id': 'abcd'
})
consumer.subscribe(['API_TABLE'])
while True:
try:
msg = consumer.poll(10)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
continue
if msg.error():
print("AvroConsumer error: {}".format(msg.error()))
continue
print(msg.value())
consumer.close()
它显示了这个错误。为什么?在
Traceback (most recent call last):
File "/Users/ylee/PycharmProjects/Bokeh/env/lib/python3.7/site-packages/confluent_kafka/avro/__init__.py", line 149, in poll
decoded_key = self._serializer.decode_message(message.key(), is_key=True)
File "/Users/ylee/PycharmProjects/Bokeh/env/lib/python3.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 225, in decode_message
raise SerializerError("message does not start with magic byte")
confluent_kafka.avro.serializer.SerializerError: message does not start with magic byte
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/ylee/PycharmProjects/Bokeh/consumer.py", line 18, in <module>
msg = consumer.poll(10)
File "/Users/ylee/PycharmProjects/Bokeh/env/lib/python3.7/site-packages/confluent_kafka/avro/__init__.py", line 156, in poll
e))
confluent_kafka.avro.serializer.SerializerError: Message deserialization failed for message at API_TABLE [0] offset 110: message does not start with magic byte
Confluentic AVRO不支持字节类型?因此,某些假定格式的库无法读取该格式。在
https://github.com/confluentinc/ksql/issues/1282您可以使用合并库对https://github.com/confluentinc/confluent-kafka-python进行编码/解码来解决这个问题
问题是,当KSQL将值写成Avro时,键是
STRING
。在这个问题看起来是一样的,有一个建议的PR来修复:https://github.com/confluentinc/confluent-kafka-python/pull/650
我试着自己解决。我修改了一点以适应我的需要。这里有一个解决方法。在
要了解它发生的原因,可以在我的blog上阅读
相关问题 更多 >
编程相关推荐