有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

使用kafka java api的Avro序列化程序和反序列化程序

Kafka Avro序列化程序和反序列化程序不工作。我尝试使用kafka控制台消费者消费这些消息,我可以看到发布的消息

public class AvroProducer<T> {

    private static Properties props;
    static {
        props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "Kafka Avro Producer");
    }

    private static KafkaProducer producer = new KafkaProducer<>(props);

    public byte[] createRecords(T pojo) throws IOException{
        Schema.Parser parser = new Schema.Parser();
        Schema schema = null;
        try {
            schema = parser.parse(AvroProducer.class.getClassLoader().getResourceAsStream("syslog.avsc"));
        } catch (IOException e) {
            System.out.println(e.getLocalizedMessage());
        }

        final GenericData.Record record = new GenericData.Record(schema);
        schema.getFields().forEach(r -> record.put(r.name(), PropertyAccessorFactory.forDirectFieldAccess(pojo).getPropertyValue(r.name())));
        SpecificDatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(schema);
        try(ByteArrayOutputStream os = new ByteArrayOutputStream()){
            BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null);

            writer.write(record, encoder);
            encoder.flush();
            byte[] bytemessage = os.toByteArray();
            return bytemessage;
        }

    }

    public static void sendMessage(byte[] bytemessage){
        ProducerRecord precord = new ProducerRecord<StringSerializer, byte[]>("jason", bytemessage);
        producer.send(precord);
    }
}



    public class AvroConsumer {

    private static Properties kafkaProps;

    static {
        kafkaProps = new Properties();
        kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "AvroConsumer-GroupOne");
    }

    @SuppressWarnings("unchecked")
    public static void recieveRecord() throws IOException{
        try (KafkaConsumer<String,byte[]> kafkaConsumer = new KafkaConsumer<>(kafkaProps)) {
            kafkaConsumer.subscribe(Arrays.asList("jason"));
            while (true) {
                ConsumerRecords<String,byte[]> records = kafkaConsumer.poll(100);
                Schema.Parser parser = new Schema.Parser();
                final Schema schema = parser.parse(AvroProducer.class.getClassLoader().getResourceAsStream("syslog.avsc"));
                records.forEach(record -> {
                    SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(schema);
                    ByteArrayInputStream is = new ByteArrayInputStream(record.value());
                    BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(is, null);
                    try {
                        Syslog log = (Syslog) datumReader.read(null, binaryDecoder);

                        System.out.println("Value: " + log);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    }

}

堆栈跟踪如下所示。请查看详细信息。有人能指导我正确的实施吗。问题似乎出在演员的记录上。如何访问该值。如何使用特定的数据读取器读取数据

Exception in thread "Thread-1" java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.cisco.daas.kafka.Syslog
at com.cisco.daas.kafka.AvroConsumer.lambda$0(AvroConsumer.java:46)
at java.lang.Iterable.forEach(Unknown Source)
at com.cisco.daas.kafka.AvroConsumer.recieveRecord(AvroConsumer.java:41)
at com.cisco.daas.kafka.MainApp$1.run(MainApp.java:32)
at java.lang.Thread.run(Unknown Source)

这就是我试图解析的模式

{
    "namespace": "com.example.syslogmessage",
    "type": "record",
    "name": "SysLogMessage",
    "fields": [{
            "name": "partyID",
            "type": "long"
        },
        {
            "name": "applianceID",
            "type": "string"
        },
        {
            "name": "message",
            "type": "string"
        },
        {
            "name": "severity",
            "type": "long"
        },
        {
            "name": "messageType",
            "type": "string"
        },
        {
            "name": "eventtime",
            "type": "long",
            "logicalType": "timestamp-millis"
        },
        {
            "name": "ipaddress",
            "type": "string"
        },
        {
            "name": "hostname",
            "type": "string"
        }
    ]
}

共 (1) 个答案

  1. # 1 楼答案

    该问题通过使用genericord的键value来解决

    public class AvroConsumer<T> {
    
        private static Properties kafkaProps;
    
        static {
            kafkaProps = new Properties();
            kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            kafkaProps.put("bootstrap.servers", "localhost:9092");
            kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
            kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "AvroConsumer-GroupOne");
        }
    
        public void recieveRecord() throws IOException {
            try (KafkaConsumer<String, byte[]> kafkaConsumer = new KafkaConsumer<>(kafkaProps)) {
                kafkaConsumer.subscribe(Arrays.asList("jason"));
                while (true) {
                    ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(100);
                    Schema.Parser parser = new Schema.Parser();
                    final Schema schema = parser
                            .parse(AvroProducer.class.getClassLoader().getResourceAsStream("syslog.avsc"));
                    records.forEach(record -> {
                        SpecificDatumReader<T> datumReader = new SpecificDatumReader<>(schema);
                        ByteArrayInputStream is = new ByteArrayInputStream(record.value());
                        BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(is, null);
                        try {
                            T log = datumReader.read(null, binaryDecoder);
                            System.out.println("Value: " + log);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    });
                }
            }
        }
    
    }