有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java任务取消在RxJava中是如何工作的?

我不清楚如何在RXJava中实现任务取消

我对移植使用Guava的ListenableFuture构建的现有API感兴趣。我的用例如下:

  • 我有一个单一的操作,它由一系列由Futures.transform()连接的未来组成
  • 多个用户观察该行动的最终未来
  • 每个观察者都可以取消最终的未来,所有观察者都可以见证取消事件
  • 最终未来的取消会导致其依赖项的取消,例如按顺序1->2->3,对{}的取消被传播到{},依此类推

RxJava维基中几乎没有关于这方面的信息;我能找到的唯一参考文献提到Subscription相当于。NET的Disposable,但就我所见,订阅只提供从序列中的后续值取消订阅的功能

我不清楚如何通过这个API实现“任何订阅者都可以取消”语义。我是不是想错了

如有任何意见,将不胜感激


共 (1) 个答案

  1. # 1 楼答案

    了解Cold vs Hot Observables很重要。如果你的观察对象是冷的,那么如果你没有订户,它们的操作将不会执行。因此,要“取消”,只需确保所有观察者都取消订阅源Observable

    但是,如果只有源的一个观察者取消订阅,并且还有其他观察者订阅了源,则不会导致“取消”。在这种情况下,您可以使用ConnectableObservables(但这不是唯一的解决方案)。另见this link about Rx.NET

    使用ConnectableObservable的一种实用方法是简单地对任何冷的Observable调用.publish().refCount()。这样做的目的是创建一个“代理”观察者,将事件从源传递到实际观察者。代理观察者在最后一个实际观察者退订时退订

    要手动控制ConnectableObservable,只需调用coldSource.publish(),就会得到ConnectableObservable的一个实例。然后您可以调用.connect(),它将返回您对“代理”观察者的订阅。要手动“取消”源,只需取消订阅代理观察者


    对于您的特定问题,还可以使用.takeUntil()运算符

    假设您的“最终未来”在RxJava中被移植为finalStream,并且假设“取消事件”是可观察的cancelStream1cancelStream2,等等,那么由finalStream产生的“取消”操作变得相当简单:

    Observable<FooBar> finalAndCancelableStream = finalStream
        .takeUntil( Observable.merge(cancelStream1, cancelStream2) );
    

    在图中,this is how takeUntil worksthis is how merge works

    在简单的英语中,您可以将其理解为“finalAndCancelableStream是finalStream,直到cancelStream1或cancelStream2发出事件为止”