有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java使用Spring云流源将方法结果发送到流

我试图在Spring Boot应用程序中创建一个Spring Cloud Stream源Bean,它只将方法的结果发送到流(底层Kafka主题绑定到流)

我看到的大多数流样本都使用@InboundChannelAdapter注释,使用轮询器向流发送数据。但我不想用民意测验。我曾尝试将轮询器设置为空数组,但另一个问题是,当使用@InboundChannelAdapter时,您无法获得任何方法参数

我试图做的事情的整体概念是从入站流中读取的。执行一些异步处理,然后将结果发布到出站流。因此,使用处理器似乎也不是一种选择。我使用@StreamListener和一个接收通道来读取入站流,这很有效

这是我一直在尝试的一些代码,但根本不起作用。我希望它会这么简单,因为我的接收器是,但可能不是。找人告诉我一个不是处理器(即不需要监听入站频道)的源的例子,并且不使用@InboundChannelAdapter,或者给我一些设计技巧,以不同的方式完成我需要做的事情。谢谢

@EnableBinding(Source.class)
public class JobForwarder {

   @ServiceActivator(outputChannel = Source.OUTPUT)
   @SendTo(Source.OUTPUT)
   public String forwardJob(String message) {
       log.info(String.format("Forwarding a job message [%s] to queue [%s]", message, Source.OUTPUT));
       return message;
   }
}

共 (1) 个答案

  1. # 1 楼答案

    谢谢你的意见。我花了一段时间才回到这个问题上来。我确实试过阅读@Publisher的文档。它看起来正是我所需要的,但我就是无法初始化正确的bean来正确连接它

    为了回答您的问题,在对输入进行异步处理之后,会调用forwardJob()方法

    最终,我只是直接使用spring-kafka库来实现,这更加明确,而且感觉更容易开始。我认为我们将坚持卡夫卡作为唯一的通道绑定,所以我认为我们将坚持使用该库

    然而,我们最终让spring cloud stream库非常简单地工作。这是一个没有轮询器的单一源代码

    @Component
    @EnableBinding(Source.class)
    public class JobForwarder {
    
        private Source source;
    
        @Autowired
        public ScheduledJobForwarder(Source source) {
            this.source = source;
        }
    
        public void forwardScheduledJob(String message) {
            log.info(String.format("Forwarding a job message [%s] to queue [%s]", message, Source.OUTPUT));
            source.output().send(MessageBuilder.withPayload(message).build());
        }
    }