有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java无法在使用Spring cloud stream binder的Spring引导应用程序中检索KafkaStreams对象

所以我的问题是,我的属性文件中定义了一些卡夫卡主题,我可以从该主题中读取KafkaStream<String, String>,在我的SpringBoot应用程序中没有问题。但是我想访问KafkaStreams对象,以便打印对开发有用的KafkaStreams拓扑。 在我的一个@StreamListener中,我尝试检索流生成器进程bean,以便以这种方式获取底层的KafkaStreams对象(如下所述:https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html#_accessing_the_underlying_kafkastreams_object),但不幸的是,它无法工作。 代码如下:

@StreamListener
public void processEvent(@Input("order-paid-stream") KStream<String, String> inputStream) {
    StreamsBuilderFactoryBean streamsBuilderFactoryBean = applicationContext.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
    KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
    System.out.println(kafkaStreams.toString());
    inputStream.foreach(this::handleMessage);
}

应用程序启动后,我收到以下消息:

enter image description here

在应用程序启动后,我还尝试以相同的方式在我的一个REST控制器方法上检索KafkaStreams对象,但出现了类似的错误(找不到该名称的bean)

有什么帮助吗


共 (2) 个答案

  1. # 1 楼答案

    从延迟的单独线程开始,以确保创建kafkaStream对象并完成拓扑

    streamsBuilderFactoryBean。getKafkaStreams()返回KafkaStream对象 streamsBuilderFactoryBean。GetSingleTonistance()。拓扑返回拓扑对象 streamsBuilderFactoryBean。getStreamsConfiguration()返回所有设置的kafka流配置

    class StreamsListener {
        @StreamListener
        @SendTo("output")
        public KStream<String, String> process(@Input("input') KStream<String,String> rawCloudEventKStream {
    
            new Thread(() -> {
                try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); }
    
                StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-StreamsListener-process", StreamsBuilderFactoryBean.class);
                System.out.println("KafkaStreams configs: " + streamsBuilderFactoryBean.getStreamsConfiguration());
                KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
            }).start();
    
            ...
        }
    }
    
  2. # 2 楼答案

    我遇到了一个类似的问题,并发现我还需要包括进程函数在例如stream-builder-MyStreamProcessor-process中定义的类名