有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java将RxJava 1.1.5改编为反应堆堆芯3.1.0。M3

我试图使用一个使用RxJava 1.1.5和SpringWebFlux的库(即Reactor Core 3.1.0.M3),但是我在将Observable调整到Flux时遇到了问题

我认为这将相对简单,但我的适配器无法工作:

import reactor.core.publisher.Flux;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;

public static <T> Flux<T> toFlux(Observable<T> observable) {
    return Flux.create(emitter -> {
        final Subscription subscription = observable.subscribe(new Subscriber<T>() {
            @Override
            public void onNext(T value) {
                emitter.next(value);
            }
            @Override
            public void onCompleted() {
                emitter.complete();
            }
            @Override
            public void onError(Throwable throwable) {
                emitter.error(throwable);
            }
        });
        emitter.onDispose(subscription::unsubscribe);
    });
}

我已经验证了onNextonCompleted都以正确的顺序被调用,但是我的Flux总是空的。有人知道我做错了什么吗

另一方面,为什么在reactor-addons中没有rxjava1的适配器


共 (1) 个答案

  1. # 1 楼答案

    使用RxJavaReactiveStreams适配器将Observable转换为Publisher,然后让Flux.fromPublisher()使用它

    compile 'io.reactivex:rxjava-reactive-streams:1.2.1'
    
    Observable<T> o = ...
    
    Flux.from(RxReactiveStreams.toPublisher(o));
    

    On a related note, why isn't there an adapter for RxJava 1 in reactor-addons?

    他们不想支持或鼓励使用这种老技术,我完全同意