java任务取消在RxJava中是如何工作的?
我不清楚如何在RXJava中实现任务取消
我对移植使用Guava的ListenableFuture
构建的现有API感兴趣。我的用例如下:
- 我有一个单一的操作,它由一系列由
Futures.transform()
连接的未来组成 - 多个用户观察该行动的最终未来李>
- 每个观察者都可以取消最终的未来,所有观察者都可以见证取消事件李>
- 最终未来的取消会导致其依赖项的取消,例如按顺序
1
->2
->3
,对{}的取消被传播到{ },依此类推李>
RxJava维基中几乎没有关于这方面的信息;我能找到的唯一参考文献提到Subscription
相当于。NET的Disposable
,但就我所见,订阅只提供从序列中的后续值取消订阅的功能
我不清楚如何通过这个API实现“任何订阅者都可以取消”语义。我是不是想错了
如有任何意见,将不胜感激
# 1 楼答案
了解Cold vs Hot Observables很重要。如果你的观察对象是冷的,那么如果你没有订户,它们的操作将不会执行。因此,要“取消”,只需确保所有观察者都取消订阅源Observable
但是,如果只有源的一个观察者取消订阅,并且还有其他观察者订阅了源,则不会导致“取消”。在这种情况下,您可以使用ConnectableObservables(但这不是唯一的解决方案)。另见this link about Rx.NET
使用ConnectableObservable的一种实用方法是简单地对任何冷的Observable调用
.publish().refCount()
。这样做的目的是创建一个“代理”观察者,将事件从源传递到实际观察者。代理观察者在最后一个实际观察者退订时退订要手动控制ConnectableObservable,只需调用
coldSource.publish()
,就会得到ConnectableObservable的一个实例。然后您可以调用.connect()
,它将返回您对“代理”观察者的订阅。要手动“取消”源,只需取消订阅代理观察者对于您的特定问题,还可以使用
.takeUntil()
运算符假设您的“最终未来”在RxJava中被移植为
finalStream
,并且假设“取消事件”是可观察的cancelStream1
、cancelStream2
,等等,那么由finalStream
产生的“取消”操作变得相当简单:在图中,this is how takeUntil works和this is how merge works
在简单的英语中,您可以将其理解为“finalAndCancelableStream是finalStream,直到cancelStream1或cancelStream2发出事件为止”