java如何将通量链接到另一个通量/单声道并应用另一个背压?
我有以下使用堆芯磁通量的无功代码:
Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
.flatMap(map -> redisHashReactiveCommands.hmset(key, map))
//.flatMap(... //want to store same data async into kafka with its own back pressure handling)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Redis consumed. Result -> {}", s))
.doOnComplete(() -> log.debug("On completed."))
.doOnError(exception -> log.error("Error occurred while consuming message", exception))
.subscribe();
正如您所看到的,对于我的流程的外部源(FluxSink.OverflowStrategy.LATEST),我对此进行了背压处理。但是,我还想将我的进程的背压配置为redis(redisHashReactiveCommands.hmset(key,map)),因为它可能是比进程的外部源更大的瓶颈。我希望我需要为redis部分创建另一个流量,并将其与此流量链接,但从那时起我该如何实现这一点呢。flatMap对单个项目有效,而不是对项目流有效
此外,我还想将相同的发射项目存储到卡夫卡中,但链接flapMap似乎不起作用。。是否有一种简单的方法可以将所有这些链接在一组函数调用(外部源->;我的流程->;redis,我的流程->;kafka)中
# 1 楼答案
如果您对主序列中的结果对象不感兴趣,可以将^{中的两个保存组合起来。您必须将subscribeOn移动并登录到flatMap中,才能将其放在内部存储发布服务器上:
或者,如果确定两个进程都会发出一个结果元素或一个错误,则可以通过用
zip
替换when
将这两个结果组合成一个Tuple2