擅长:python、mysql、java
<p>您需要使用常规生产者并自己执行序列化函数</p>
<pre><code>from confluent_kafka import avro
from confluent_kafka.avro import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer as AvroSerializer
avro_serializer = AvroSerializer(schema_registry)
serialize_avro = avro_serializer.encode_record_with_schema # extract function definition
value_schema = avro.load('avro_schemas/value.avsc') # TODO: Create avro_schemas folder
p = Producer({'bootstrap.servers': bootstrap_servers})
value_payload = serialize_avro(topic, value_schema, value, is_key=False)
p.produce(topic, key=key, value=value_payload, callback=delivery_report)
</code></pre>