java使用缓冲区合并热态和冷态
我有两个数据来源
- 获取项目列表的当前状态的服务调用
- 这些项目的一系列更新
我正在尝试以一种不会在进行服务呼叫时丢失任何更新的方式合并它们
我做了一个简单的测试来做实验,我想这说明了我想要什么
private val coldObservable = Observable.just(1, 2, 3, 4, 5)
private val subject = PublishSubject.create<Int>()
private val hotObservable = subject.hide()
@Test
fun test() {
subject.onNext(10)
val test = Flowable.concat(
coldObservable.toFlowable(BackpressureStrategy.BUFFER), hotObservable.toFlowable(BackpressureStrategy.BUFFER)
)
.doOnSubscribe { subject.onNext(20) }
.test()
test.await(1, TimeUnit.SECONDS)
test.assertNotComplete()
test.assertNotTerminated()
subject.onComplete()
test.assertComplete()
test.assertValues(1,2,3,4,5)
}
实际上,这项测试通过了。但我真正想要的是
test.assertValues(1,2,3,4,5,10,20)
我原以为背压会很容易让我保持排放,但我想不会,因为它没有订阅
没有办法将这两个源合并到一个流中吗
# 1 楼答案
这里有一个解决方案,具有类似的效果,我正在寻找。 它存储实时数据,然后在服务调用完成后进行处理
它不会发出一个流,我仍然不确定这是否可行
# 2 楼答案
我不确定我是否理解你的所有代码(我自己是新来的)。但您的用例似乎与我的用例相同
我的用例是这样的(因此您可以决定它是否相关):我使用HTTP请求来获取数据结构的初始状态。但是一个web套接字流不断地发送对该数据的更新。如果我首先初始化数据结构,然后用流更新它,那么在这两者之间的短时间内更新就会丢失。因此,我需要首先获取流,在等待HTTP请求返回数据结构的同时缓存其更新数据,然后在数据上应用流(缓存中)的积压更新。从那个里开始,随着套接字中的数据进入,进行更新
我不知道Kotlin,所以在Java中:
这将产生:
scanWith: http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#scanWith-java.util.concurrent.Callable-io.reactivex.functions.BiFunction-
您还可以使用scan(直接提供初始数据,而不是使用可调用的): http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#scan-R-io.reactivex.functions.BiFunction-