java无法在使用Spring cloud stream binder的Spring引导应用程序中检索KafkaStreams对象
所以我的问题是,我的属性文件中定义了一些卡夫卡主题,我可以从该主题中读取KafkaStream<String, String>
,在我的SpringBoot应用程序中没有问题。但是我想访问KafkaStreams
对象,以便打印对开发有用的KafkaStreams
拓扑。
在我的一个@StreamListener
中,我尝试检索流生成器进程bean,以便以这种方式获取底层的KafkaStreams
对象(如下所述:https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html#_accessing_the_underlying_kafkastreams_object),但不幸的是,它无法工作。
代码如下:
@StreamListener
public void processEvent(@Input("order-paid-stream") KStream<String, String> inputStream) {
StreamsBuilderFactoryBean streamsBuilderFactoryBean = applicationContext.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
System.out.println(kafkaStreams.toString());
inputStream.foreach(this::handleMessage);
}
应用程序启动后,我收到以下消息:
在应用程序启动后,我还尝试以相同的方式在我的一个REST控制器方法上检索KafkaStreams
对象,但出现了类似的错误(找不到该名称的bean)
有什么帮助吗
# 1 楼答案
从延迟的单独线程开始,以确保创建kafkaStream对象并完成拓扑
streamsBuilderFactoryBean。getKafkaStreams()返回KafkaStream对象 streamsBuilderFactoryBean。GetSingleTonistance()。拓扑返回拓扑对象 streamsBuilderFactoryBean。getStreamsConfiguration()返回所有设置的kafka流配置
# 2 楼答案
我遇到了一个类似的问题,并发现我还需要包括进程函数在例如
stream-builder-MyStreamProcessor-process
中定义的类名