java将RxJava 1.1.5改编为反应堆堆芯3.1.0。M3
我试图使用一个使用RxJava 1.1.5和SpringWebFlux的库(即Reactor Core 3.1.0.M3),但是我在将Observable
调整到Flux
时遇到了问题
我认为这将相对简单,但我的适配器无法工作:
import reactor.core.publisher.Flux;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
public static <T> Flux<T> toFlux(Observable<T> observable) {
return Flux.create(emitter -> {
final Subscription subscription = observable.subscribe(new Subscriber<T>() {
@Override
public void onNext(T value) {
emitter.next(value);
}
@Override
public void onCompleted() {
emitter.complete();
}
@Override
public void onError(Throwable throwable) {
emitter.error(throwable);
}
});
emitter.onDispose(subscription::unsubscribe);
});
}
我已经验证了onNext
和onCompleted
都以正确的顺序被调用,但是我的Flux
总是空的。有人知道我做错了什么吗
另一方面,为什么在reactor-addons中没有rxjava1的适配器
# 1 楼答案
使用RxJavaReactiveStreams适配器将
Observable
转换为Publisher
,然后让Flux.fromPublisher()
使用它他们不想支持或鼓励使用这种老技术,我完全同意