有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Kafka流转发方法引发NullPointerException,因为ProcessorNode currentNode对象为null

我们已经编写了Kafka流应用程序,它从源主题读取数据,并在2处理器中执行一些业务逻辑,然后将输出写入sink主题

下面是创建拓扑、添加源、处理器(请注意,我们正在添加2个处理器)和接收器的代码

Topology topology = new Topology();
topology.addSource("sourceProcessor", "source-topic")
        .addProcessor("Process", ()->fileExtractProcessorObject , "sourceProcessor")
        .addProcessor("XMLJSON", () -> xmlJson , "Process")
        .addSink("sinkProcessor", "sink-topic", "XMLJSON");
KafkaStreams streams = new KafkaStreams(topology, getProperties(appConfig,sourceConfig));
        streams.start()

下面是第一个处理器代码

public class FileExtractProcessor implements Processor<String, byte[]> {
    private ProcessorContext context;
    public FileExtractProcessor() {
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, byte[] bytearray) {
            /* business logic */

            //This is pojo class which will be forwarded to next processor as value
            ProcessData pData = new ProcessData();

                // Setting pojo object and forwarding to next processor
                pData.setValue(value);
                pData.setXmlData(files);
                pData.setAppconfig(ac);

                context.forward(value.getRunKeyId(), pData, To.child("XMLJSON"));
                context.commit();    
    }
}

在上面的代码中,当我们调用forward方法时,我们会在下面的代码中得到null指针异常

org.apache.kafka.streams.processor.internals.ProcessorContextImpl#forward(K, V, org.apache.kafka.streams.processor.To)

ProcessorNode child = this.currentNode().getChild(sendTo); 

java.lang.NullPointerException
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
    at com.myapp.FileExtractProcessor.process(FileExtractProcessor.java:78)
    at com.myapp.FileExtractProcessor.process(FileExtractProcessor.java:22)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)

我们正在使用kafka-stream-2.4.0。罐子 你能帮我找到我们失踪的地方吗。无法找出currentNode对象为Null的原因。 早期的帮助是非常感谢的


共 (1) 个答案

  1. # 1 楼答案

    根据代码:https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L183该错误意味着currentNode()返回null

    这表示您违反了ProcessorSupplier的规则,在get()上返回一个新的Processor实例;当您将() -> fileExtractProcessorObject传递给addProcessor每次返回相同的对象引用时,这似乎保持不变,而您每次都需要通过() -> new FileExtractProcessor()创建一个新实例

    我创建了一个票证来改进错误消息:https://issues.apache.org/jira/browse/KAFKA-10036