有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java RXJava忽略错误并在链中继续

我有一个可以抛出错误的RXJava链。 我希望链继续,忽略错误,并达到订阅,只有一些具体的错误

例如,在这段代码中

authDataManager
                    .getUserAccessToken(username, password)
                    .subscribeOn(Schedulers.io())
                    .doOnNext({
                        authDataManager.saveUserAccessToken(it)
                    })
                    .flatMap {
                        appDataManager.getStations()
                    }
                    .doOnNext({
                        appDataManager.persistStations(it)
                    })
                    .flatMap {
                        appDataManager.getDriverInformation()
                    }
                    .doOnNext({
                        appDataManager.persistDriverInformation(it)
                    })
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(
                            {
                                onLoginSuccess()
                            },
                            {
                                onLoginFailure(it)
                            }
                    )

如果appDataManager.getStations()抛出一个错误,我仍然希望继续并到达onLoginSuccess()方法

但是,如果getUserAccessToken(username, password)失败,则应调用onLoginFailure

我曾尝试在平面图之后及其内部添加OneRorResumeNext()和OneExceptionResumeNext(),但如果我这样做,链就会退出,不会继续并到达订阅服务器


共 (3) 个答案

  1. # 1 楼答案

    我想你可以使用onErrorResumeNext操作符

                  authDataManager
                        .getUserAccessToken(username, password)
                        .subscribeOn(Schedulers.io())
                        .doOnNext({
                            authDataManager.saveUserAccessToken(it)
                        })
                        .flatMap {
                            appDataManager.getStations()
                                 .onErrorResumeNext(Observable.empty())
                        }
                        .doOnNext({
                            appDataManager.persistStations(it)
                        })
                        .flatMap {
                            appDataManager.getDriverInformation()
                        }
                        .doOnNext({
                            appDataManager.persistDriverInformation(it)
                        })
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(
                                {
                                    onLoginSuccess()
                                },
                                {
                                    onLoginFailure(it)
                                }
                        )
    
  2. # 2 楼答案

    我是在科特林发现的

    它可以通过使用映射、过滤、选项和onErrorResumeNext的组合来实现。我认为这和当前版本的RxJava一样优雅

    authDataManager
        .getUserAccessToken(username, password)
        .subscribeOn(Schedulers.io())
        .doOnNext({
            authDataManager.saveUserAccessToken(it)
        })
        .flatMap {
            appDataManager.getStations()
                .map(stations -> Single.just(Optional.of(stations))
                .onErrorResumeNext(error -> Single.just(Optional.empty()))                    
         }
         .filter(optional -> optional.isPresent())
         .map(stations -> optional.get())
         .doOnNext({
             appDataManager.persistStations(it)
         })
         .flatMap {
             appDataManager.getDriverInformation()
         }
         .doOnNext({
             appDataManager.persistDriverInformation(it)
         })
         .observeOn(AndroidSchedulers.mainThread())
         .subscribe(
             {
                 onLoginSuccess()
             },
             {
                 onLoginFailure(it)
             }
         )
    
  3. # 3 楼答案

    站在@savepopulation答案的肩膀上,观察你有三个不同的观察者链:

             authDataManager
                    .getUserAccessToken(username, password)
                    .subscribeOn(Schedulers.io())
                    .doOnNext({
                        authDataManager.saveUserAccessToken(it)
                    })
                    .doOnNext(
                        appDataManager.getStations()
                             .onErrorResumeNext(Observable.empty())
                             .subscribe( appDataManager.persistStations(it) )
                    )
                    .doOnNext( appDataManager.getDriverInformation()
                                 .subscribe( appDataManager.persistDriverInformation(it)
                    )
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(
                            {
                                onLoginSuccess()
                            },
                            {
                                onLoginFailure(it)
                            }
                    )
    

    主观察链流向完成,而两个子链根本不参与日志;它们只是通过获取用户访问令牌触发的

    如果需要这些分支与主链异步运行,则必须添加subscribeOn()/observeOn()运算符