有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java RxJava:如何组合多个具有依赖性的观测值,并在最后收集所有结果?

我正在学习RxJava,作为我的第一个实验,我试图在this code中的第一个run()方法中重写代码(在Netflix's blog中引用为RxJava可以帮助解决的一个问题),以使用RxJava改进其异步性,也就是说,它不必等待第一个未来(f1.get())的结果,然后再继续执行代码的其余部分

f3取决于f1。我看到了如何处理这个问题,flatMap似乎做到了:

Observable<String> f3Observable = Observable.from(executor.submit(new CallToRemoteServiceA()))
    .flatMap(new Func1<String, Observable<String>>() {
        @Override
        public Observable<String> call(String s) {
            return Observable.from(executor.submit(new CallToRemoteServiceC(s)));
        }
    });

接下来,f4f5依赖于f2。我有这个:

final Observable<Integer> f4And5Observable = Observable.from(executor.submit(new CallToRemoteServiceB()))
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer i) {
            Observable<Integer> f4Observable = Observable.from(executor.submit(new CallToRemoteServiceD(i)));
            Observable<Integer> f5Observable = Observable.from(executor.submit(new CallToRemoteServiceE(i)));
            return Observable.merge(f4Observable, f5Observable);
        }
    });

这开始变得很奇怪(merge我可能不想看到他们…)但最终允许我这样做,而不是我想要的:

f3Observable.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println("Observed from f3: " + s);
        f4And5Observable.subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer i) {
                System.out.println("Observed from f4 and f5: " + i);
            }
        });
    }
});

这给了我:

Observed from f3: responseB_responseA
Observed from f4 and f5: 140
Observed from f4 and f5: 5100

这是所有的数字,但不幸的是,我在单独的调用中得到了结果,所以我不能完全替换原始代码中的最终println:

System.out.println(f3.get() + " => " + (f4.get() * f5.get()));

我不明白如何在同一行中访问这两个返回值。我想这里可能缺少一些函数式编程。我该怎么做?谢谢


共 (3) 个答案

  1. # 1 楼答案

    看起来你真正需要的是更多的鼓励和对如何使用RX的看法。我建议您阅读更多的文档以及大理石图(我知道它们并不总是有用的)。我还建议研究lift()函数和运算符

    • 一个可观察对象的全部要点是将数据流和数据操作连接到一个对象中
    • 调用mapflatMapfilter的目的是操作数据流中的数据
    • 合并的重点是合并数据流
    • 运算符的作用是允许您中断稳定的可观测数据流,并定义您自己对数据流的操作。例如,我编写了一个移动平均运算符。这将n{}在可观察的双倍中进行汇总,以返回移动平均值流。代码看起来就像这样

      Observable movingAverage = Observable.from(mDoublesArray).lift(new MovingAverageOperator(frameSize))

    您会松一口气,因为您认为理所当然的许多过滤方法都有lift()隐藏

    话虽如此,;合并多个依赖项所需的全部工作是:

    • 使用mapflatMap将所有传入数据更改为标准数据类型
    • 将标准数据类型合并到流
    • 如果一个对象需要等待另一个对象,或者需要在流中排序数据,则使用自定义运算符。警告:此方法将减慢流速度
    • 用于列出或订阅以收集所有这些数据
  2. # 2 楼答案

    <强>编辑:< /强>某人把下面的文本转换成一个答案,我把它作为一个编辑,作为一个答案,我很欣赏,理解可能是适当的事情要做,但是我不认为这是一个答案,因为它显然不是正确的方式。我不会使用此代码,也不会建议任何人复制它欢迎其他/更好的解决方案和评论


    我能够用以下方法解决这个问题。我没有意识到你可以不止一次观察到一个结果,我假设结果只能被消耗一次。所以我只是flatMapf2Observable两次(对不起,我在我的原始帖子中重命名了代码中的一些东西),然后zip对所有的Observable,然后订阅它。在zip中使用Map来聚合值是不受欢迎的,因为类型转换欢迎其他/更好的解决方案和评论The full code is viewable in a gist。多谢各位

    Future<Integer> f2 = executor.submit(new CallToRemoteServiceB());
    Observable<Integer> f2Observable = Observable.from(f2);
    Observable<Integer> f4Observable = f2Observable
        .flatMap(new Func1<Integer, Observable<Integer>>() {
            @Override
            public Observable<Integer> call(Integer integer) {
                System.out.println("Observed from f2: " + integer);
                Future<Integer> f4 = executor.submit(new CallToRemoteServiceD(integer));
                return Observable.from(f4);
            }       
        });     
    
    Observable<Integer> f5Observable = f2Observable
        .flatMap(new Func1<Integer, Observable<Integer>>() {
            @Override
            public Observable<Integer> call(Integer integer) {
                System.out.println("Observed from f2: " + integer);
                Future<Integer> f5 = executor.submit(new CallToRemoteServiceE(integer));
                return Observable.from(f5);
            }       
        });     
    
    Observable.zip(f3Observable, f4Observable, f5Observable, new Func3<String, Integer, Integer, Map<String, String>>() {
        @Override
        public Map<String, String> call(String s, Integer integer, Integer integer2) {
            Map<String, String> map = new HashMap<String, String>();
            map.put("f3", s);
            map.put("f4", String.valueOf(integer));
            map.put("f5", String.valueOf(integer2));
            return map;
        }       
    }).subscribe(new Action1<Map<String, String>>() {
        @Override
        public void call(Map<String, String> map) {
            System.out.println(map.get("f3") + " => " + (Integer.valueOf(map.get("f4")) * Integer.valueOf(map.get("f5"))));
        }       
    });     
    

    这就产生了我想要的结果:

    responseB_responseA => 714000
    
  3. # 3 楼答案

    我想你要找的是开关地图。我们遇到了一个类似的问题,我们有一个会话服务来处理从api获取新会话,我们需要该会话才能获取更多数据。我们可以将sessionobservate添加到返回sessionToken的sessionobservate中,以便在数据调用中使用

    getSession返回一个可观察的

    public getSession(): Observable<any>{
      if (this.sessionToken)
        return Observable.of(this.sessionToken);
      else if(this.sessionObservable)
        return this.sessionObservable;
      else {
        // simulate http call 
        this.sessionObservable = Observable.of(this.sessonTokenResponse)
        .map(res => {
          this.sessionObservable = null;
          return res.headers["X-Session-Token"];
        })
        .delay(500)
        .share();
        return this.sessionObservable;
      }
    }
    

    getData获取可观察到的数据并附加到它

    public getData() {
      if (this.dataObservable)
        return this.dataObservable;
      else {
        this.dataObservable = this.sessionService.getSession()
          .switchMap((sessionToken:string, index:number) =>{
            //simulate data http call that needed sessionToken
              return Observable.of(this.dataResponse)
              .map(res => {
                this.dataObservable = null;
                return res.body;
              })
              .delay(1200)
            })
            .map ( data => {
              return data;
            })
            .catch(err => {
              console.log("err in data service", err);
             // return err;
            })
            .share();
        return this.dataObservable;
      }
    }
    

    您仍然需要一个平面图来组合非相关的观测值

    Plunkr:http://plnkr.co/edit/hiA1jP?p=info

    在那里我想到了使用开关映射:http://blog.thoughtram.io/angular/2016/01/06/taking-advantage-of-observables-in-angular2.html