有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java使用缓冲区合并热态和冷态

我有两个数据来源

  1. 获取项目列表的当前状态的服务调用
  2. 这些项目的一系列更新

我正在尝试以一种不会在进行服务呼叫时丢失任何更新的方式合并它们

我做了一个简单的测试来做实验,我想这说明了我想要什么

private val coldObservable = Observable.just(1, 2, 3, 4, 5)
private val subject = PublishSubject.create<Int>()
private val hotObservable = subject.hide()

@Test
fun test() {
    subject.onNext(10)

    val test = Flowable.concat(
            coldObservable.toFlowable(BackpressureStrategy.BUFFER), hotObservable.toFlowable(BackpressureStrategy.BUFFER)
    )
            .doOnSubscribe { subject.onNext(20) }
            .test()

    test.await(1, TimeUnit.SECONDS)
    test.assertNotComplete()
    test.assertNotTerminated()
    subject.onComplete()
    test.assertComplete()
    test.assertValues(1,2,3,4,5)
}

实际上,这项测试通过了。但我真正想要的是

test.assertValues(1,2,3,4,5,10,20)

我原以为背压会很容易让我保持排放,但我想不会,因为它没有订阅

没有办法将这两个源合并到一个流中吗


共 (2) 个答案

  1. # 1 楼答案

    这里有一个解决方案,具有类似的效果,我正在寻找。 它存储实时数据,然后在服务调用完成后进行处理

    @Test
    fun test2() {
        val randomString = "abc123"
        val realLog = ConcurrentHashMap<String, Boolean>()
    
        var isLive = false
    
        val subject = PublishSubject.create<Pair<String, Boolean>>()
    
        val subscription = subject.hide()
                .doOnNext {
                    if (isLive) {
                        realLog[it.first] = it.second
                    } else {
    
                        realLog[it.first+ randomString] = it.second
                    }
                }.subscribe()
    
        subject.onNext(Pair("one", false))
    
        val coldTest = Observable.just("one","two","three")
                .map { Pair(it, true) }
                .doOnSubscribe {
                    subject.onNext(Pair("twenty", false))
                }
                .doOnNext{
                    realLog[it.first] = it.second
                }
                .doOnComplete {
                    val iterator = realLog.keys.iterator()
                    while(iterator.hasNext()){
                        val oldKey = iterator.next()
                        if(oldKey.contains(randomString)){
                            val newKey = oldKey.removeSuffix(randomString)
                            realLog[newKey] = realLog[oldKey]!!
                            realLog.remove(oldKey)
                        }
                    }
                    isLive = true
                    subject.onNext(Pair("Thirty", false))
                }
                .test()
    
        coldTest.awaitTerminalEvent()
        coldTest.assertComplete()
    
        Assert.assertFalse(realLog["Thirty"]!!)
    

    它不会发出一个流,我仍然不确定这是否可行

  2. # 2 楼答案

    我不确定我是否理解你的所有代码(我自己是新来的)。但您的用例似乎与我的用例相同

    我的用例是这样的(因此您可以决定它是否相关):我使用HTTP请求来获取数据结构的初始状态。但是一个web套接字流不断地发送对该数据的更新。如果我首先初始化数据结构,然后用流更新它,那么在这两者之间的短时间内更新就会丢失。因此,我需要首先获取流,在等待HTTP请求返回数据结构的同时缓存其更新数据,然后在数据上应用流(缓存中)的积压更新。从那个里开始,随着套接字中的数据进入,进行更新

    我不知道Kotlin,所以在Java中:

    public static void main(String[] args) throws InterruptedException {
        //This one would correspond to the continuous stream of updates.
        //It sends Long, one each second
        Observable<Long> listUpdater = Observable.interval(1, TimeUnit.SECONDS);
        //Acts as both observable and observer, that replays everything to its own
        //observers.
        Subject<Long> replaySubjbect = ReplaySubject.create();
        //Start getting items from listUpdater immediately
        listUpdater.subscribe(replaySubjbect);
        //To show that listUpdater moves along regardless of what you are doing, as would
        //be the case for example with a websocket.
        listUpdater.subscribe(i -> System.out.println("I am also watching, i=" + i));
        //This callable corresponds to whatever returns the initial state of the list,
        //for example some http request
        Callable<List<Long>> clbl = () -> Arrays.asList(1L, 2L, 3L, 4L, 5L);
        //This is where the merging takes place, although we are not using one of the merging
        //methods, but scanWith. 
        Observable<List<Long>> theStream = replaySubjbect.scanWith(clbl, (List<Long> list, Long l) -> {
            //This updater adds l to each element of the list
            System.out.println("I am adding " + l + " to each element in the list");
            return list.stream().mapToLong(k -> k + l).boxed().collect(Collectors.toList());            
        });
        //Simulate a response time for the http request, or whatever it is the callable is doing
        Thread.sleep(3000);
        //Now we get the stream of lists, updated for each of the update data sent by the
        //original listUpdater
        theStream.subscribe(list -> System.out.println("theStream sent me this: " + list));
        //Just to see how it works, we sleep some more
        Thread.sleep(5000); 
    }
    

    这将产生:

    I am also watching, i=0
    I am also watching, i=1
    I am also watching, i=2
    theStream sent me this: [1, 2, 3, 4, 5] <- cahce retrieval starts
    I am adding 0 to each element in the list 
    theStream sent me this: [1, 2, 3, 4, 5] 
    I am adding 1 to each element in the list 
    theStream sent me this: [2, 3, 4, 5, 6] 
    I am adding 2 to each element in the list 
    theStream sent me this: [4, 5, 6, 7, 8]
    I am also watching, i=3     
    I am adding 3 to each element in the list <- from here on, update list as updates come in
    theStream sent me this: [7, 8, 9, 10, 11]
    I am also watching, i=4
    I am adding 4 to each element in the list
    theStream sent me this: [11, 12, 13, 14, 15]
    I am also watching, i=5
    I am adding 5 to each element in the list
    theStream sent me this: [16, 17, 18, 19, 20]
    I am also watching, i=6
    I am adding 6 to each element in the list
    theStream sent me this: [22, 23, 24, 25, 26]
    I am also watching, i=7
    I am adding 7 to each element in the list
    theStream sent me this: [29, 30, 31, 32, 33]
    

    scanWith: http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#scanWith-java.util.concurrent.Callable-io.reactivex.functions.BiFunction-

    您还可以使用scan(直接提供初始数据,而不是使用可调用的): http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#scan-R-io.reactivex.functions.BiFunction-