有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

如何阻止rxjava runnable发出?

我可以观察到以下情况:

      ScheduledExecutorService executorService = Executors.newScheduledThreadPool( 1 );
      Observable<List<Widget>> findWidgetsObservable = Observable.create( emitter -> {
         executorService.scheduleWithFixedDelay( emitFindWidgets( emitter, 0, 30, TimeUnit.SECONDS );
      } );

     private Runnable emitFindWidgets( ObservableEmitter<List<Widgets>> emitter ) {
        return () -> {
              emitter.onNext( Collections.emptyList() ); // dummy empty array
        };
     }

我在graphql java订阅解析器中返回它,如下所示:

      ConnectableObservable<List<Widget>> connectableObservable = findWidgetsObservable.share().publish();
      Disposable connectionDisposable = connectableObservable.connect();
      return connectableObservable.toFlowable( BackpressureStrategy.LATEST )

graphql订阅按预期工作,并向JavaScript graphql客户端发送数据,但当客户端取消订阅时,我的Runnable似乎无限期地继续。也就是说,flowable的doOnCancel()事件处理程序正在运行

为了解决这个问题,我尝试在flowable的doOnCancel()中执行以下操作:

    Disposable connectionDisposable = connectableObservable.connect();
    return connectableObservable.toFlowable( BackpressureStrategy.LATEST ).doOnCancel( () -> {
             findWidgetsObservable.toFuture().cancel( true );
             connectionDisposable.dispose();
    })

然而,Runnable继续无限期地省略。有什么办法可以解决这个问题并完全停止排放吗

我确实有一个想法:scheduleWithFixedDelay返回一个ScheduledFuture,它有一个cancel()方法,但我不确定当调度本身被限定在一个可观察的范围内时,我是否可以这样做!感谢您的帮助


共 (1) 个答案

  1. # 1 楼答案

    runnable会继续发射,因为您正在一个不知道/不绑定到可观察流的调度器上调度发射

    当您处理连接时,您停止从上游接收项目,因为与上游可观察的连接被切断。但是,由于您正在安排发射器在单独的调度器上重复运行,所以runnable会继续运行

    您可以使用自定义调度程序描述自定义调度行为,并将其传入subscribeOn(Your-Custom-Scheduler)

    此外,您还提到可以在doOnDispose()中调用cancel()

    但是你应该在可观察链中明确地切换调度器。否则,调试就会变得更加困难