有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java项目反应器将两个发布服务器有状态地组合起来并发出结果

我想设计一个带有反应器的处理管道,如下所示

我有两个输入发布程序orderEntries(冷)和hotBroadcasts(热)。我想将由hotBroadcasts发出的项聚合到内存数据结构中(可变的)——比如说a HashMap——并且对于来自orderEntries的每个项,我想从该映射中选择相应的元素,创建结果项并推送到下游订户

来自hotBroadcasts的事件以任意顺序出现,这就是为什么我想将它们存储在内存中以便检索

从概念上讲,它应该是这样工作的:

       orderEntries                      hotBroadcasts
           |                                   | 
           |                                   | 
           |                                   | 
           \                                   / 
            ----------------> <----------------
                   (aggregate events from hotBroadcasts)     
                             |
                             |
                        resulting item
                             |
                             |
                            \/
                      downstream subcriber  

到目前为止,我设法用ReplayProcessor来绘制一个解决方案,如Kotlin伪示例所示:

val orderEntries = Flux.interval(Duration.of(1, ChronoUnit.SECONDS))
val hotBroadcasts = ReplayProcessor.create<String>(1000, false)

orderEntries.concatMap { entryId ->
    // problematic filter - skims through all that ReplayProcessor has cached
    hotBroadcasts.filter { broadcastId ->
        "Broadcast:$entryId" == broadcastId
    }
    .take(1)
    .map { "EntryId: $entryId, BroadcastId: $it" }
}.subscribe { LOG.info(it) }

Flux.interval(Duration.of(200, ChronoUnit.MILLIS))
        .concatMap { Flux.just(it, it - 100000) }
        .map { "Broadcast:$it" }
        .subscribe {
            hotBroadcasts.onNext(it)
        }

这里的问题是hotBroadcast的过滤会略过orderEntries中每个项目的所有项目。因此,我想到将它们存储在HashMap中

有人能给我指一下正确的方向吗


共 (1) 个答案

  1. # 1 楼答案

    可以聚合来自两个不同发布者的消息的对象是一个带有2个参数的异步过程调用。这种调用可以在rxjava中使用io.reactivex.Single.zip(SingleSource arg1, SingleSource arg2, BiFunction func)构造,也可以在纯Java中使用java.util.concurrent.CompletableFuture.thenCombine(CompletionStage arg2, BiFunction func)构造

    您需要一个特殊的HashMap来保存异步过程调用。当首次使用给定标签访问此HashMap时,应自动创建调用

    所以一位公众援引了她的话

    asyncProc=callMap.get(label); // asyncProc is created and stored with the label as a key
    asyncProc.arg1.complete(value);
    

    另一个Publicher调用

    asyncProc=callMap.get(label); // previously created instance returned
    asyncProc.arg2.complete(value);
    

    在两个发布者都提供了参数之后,异步过程将被执行