有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java RxJava在可观察对象中收集AMPQ事件,并使用缓冲区订阅

我必须收集一些AMPQ事件,然后使用缓冲区每隔10秒打印一次

private Observable<Event> obs = Observable.empty(); 
private final Disposable disposable = obs.buffer(10, SECONDS)
                              .retry(t -> true)
                              .subscribe(System.out::println);

@Override
public void handle(final Event event, final MessageContext context) throws MessageConsumptionException {
      obs = obs.concatWith(Observable.just(event));
}

事件是消息,无效句柄是消费者

我调试了这段代码,它只打印一个空列表,这很有意义,因为obs是空的

如何添加(concat?)可观察到事件并持续执行一次性? 谢谢


共 (1) 个答案

  1. # 1 楼答案

    你需要一个Subject可以订阅。 可以使用next(T element)将新元素推送到主题中

    private Subject<Event> subject = ReplaySubject.create();
    
    @Override
    public void handle(final Event event, final MessageContext context) throws MessageConsumptionException {
         subject.next(event);
    }
    
    public Observable<Event> getObservable() {
        subject.asObservable();
    }
    

    您可以订阅由getObservable()方法返回的observable