java RxJava:如何组合多个具有依赖性的观测值,并在最后收集所有结果?
我正在学习RxJava,作为我的第一个实验,我试图在this code中的第一个run()
方法中重写代码(在Netflix's blog中引用为RxJava可以帮助解决的一个问题),以使用RxJava改进其异步性,也就是说,它不必等待第一个未来(f1.get()
)的结果,然后再继续执行代码的其余部分
f3
取决于f1
。我看到了如何处理这个问题,flatMap
似乎做到了:
Observable<String> f3Observable = Observable.from(executor.submit(new CallToRemoteServiceA()))
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
return Observable.from(executor.submit(new CallToRemoteServiceC(s)));
}
});
接下来,f4
和f5
依赖于f2
。我有这个:
final Observable<Integer> f4And5Observable = Observable.from(executor.submit(new CallToRemoteServiceB()))
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer i) {
Observable<Integer> f4Observable = Observable.from(executor.submit(new CallToRemoteServiceD(i)));
Observable<Integer> f5Observable = Observable.from(executor.submit(new CallToRemoteServiceE(i)));
return Observable.merge(f4Observable, f5Observable);
}
});
这开始变得很奇怪(merge
我可能不想看到他们…)但最终允许我这样做,而不是我想要的:
f3Observable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("Observed from f3: " + s);
f4And5Observable.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println("Observed from f4 and f5: " + i);
}
});
}
});
这给了我:
Observed from f3: responseB_responseA
Observed from f4 and f5: 140
Observed from f4 and f5: 5100
这是所有的数字,但不幸的是,我在单独的调用中得到了结果,所以我不能完全替换原始代码中的最终println:
System.out.println(f3.get() + " => " + (f4.get() * f5.get()));
我不明白如何在同一行中访问这两个返回值。我想这里可能缺少一些函数式编程。我该怎么做?谢谢
# 1 楼答案
看起来你真正需要的是更多的鼓励和对如何使用RX的看法。我建议您阅读更多的文档以及大理石图(我知道它们并不总是有用的)。我还建议研究
lift()
函数和运算符map
、flatMap
和filter
的目的是操作数据流中的数据运算符的作用是允许您中断稳定的可观测数据流,并定义您自己对数据流的操作。例如,我编写了一个移动平均运算符。这将}在可观察的双倍中进行汇总,以返回移动平均值流。代码看起来就像这样
n
{Observable movingAverage = Observable.from(mDoublesArray).lift(new MovingAverageOperator(frameSize))
您会松一口气,因为您认为理所当然的许多过滤方法都有
lift()
隐藏话虽如此,;合并多个依赖项所需的全部工作是:
map
或flatMap
将所有传入数据更改为标准数据类型# 2 楼答案
<强>编辑:< /强>某人把下面的文本转换成一个答案,我把它作为一个编辑,作为一个答案,我很欣赏,理解可能是适当的事情要做,但是我不认为这是一个答案,因为它显然不是正确的方式。我不会使用此代码,也不会建议任何人复制它欢迎其他/更好的解决方案和评论强>
我能够用以下方法解决这个问题。我没有意识到你可以不止一次观察到一个结果,我假设结果只能被消耗一次。所以我只是
flatMap
f2Observable两次(对不起,我在我的原始帖子中重命名了代码中的一些东西),然后zip
对所有的Observable,然后订阅它。在zip
中使用Map
来聚合值是不受欢迎的,因为类型转换欢迎其他/更好的解决方案和评论The full code is viewable in a gist。多谢各位这就产生了我想要的结果:
# 3 楼答案
我想你要找的是开关地图。我们遇到了一个类似的问题,我们有一个会话服务来处理从api获取新会话,我们需要该会话才能获取更多数据。我们可以将sessionobservate添加到返回sessionToken的sessionobservate中,以便在数据调用中使用
getSession返回一个可观察的
getData获取可观察到的数据并附加到它
您仍然需要一个平面图来组合非相关的观测值
Plunkr:http://plnkr.co/edit/hiA1jP?p=info
在那里我想到了使用开关映射:http://blog.thoughtram.io/angular/2016/01/06/taking-advantage-of-observables-in-angular2.html