java RxJava 2将事件并行化以执行,并产生副作用
我希望并行执行,而不在最后聚合事件。我是RxJava新手,正在尝试评估它是否适合我的需要
我知道如何执行正常的非并行执行。为了简单起见,让我们从以下内容开始:
taskReader.stream() // returns Flowable<Task>
.subscribe(this::processTask) // sends results to another micro service
这是一种有序的阻塞方式。然而,我想将其并行化,并同时执行所有任务。文档基本上告诉你使用平面图,这是有道理的,但我不知道如何运行整个过程。让我们看看:
taskReader.stream() // returns Flowable<Task>
.flatMap(
Flowable.just(task)
.subscribeOn(Schedulers.computation())
.map(this::processTask)
)
这就是我所需要的,但这当然不会启动Flowable,因为它没有订阅任何内容[1]。我所有的任务都是不相关的,所以我不需要再次将它们聚合到顶级流中,而且我当然不在乎它们的顺序
你是怎么开始这样的流动的?我不想订阅顶级Flowable,因为在那里我不需要做任何其他事情
或者,文档会告诉您使用并行流:
taskReader.stream() // returns Flowable<Task>
.parallel()
.runOn(Schedulers.computation())
.map(this::processTask)
.sequential();
再说一次,我不需要在结尾排序任何东西,因为我不在乎顺序。另外,我不想订阅这个,因为地图上的工作就是我所需要的
我真正想要的是:
taskReader.stream() // returns Flowable<Task>
.flatMap(
Flowable.just(task)
.subscribeOn(Schedulers.computation())
.subscribe(this::processTask)
)
但RxJava不是这样工作的
帮忙
一些背景:我从一个队列(特别是AWS SQS)接收事件,其中的事件彼此无关。对于每一个,我都需要执行一些I/O密集型工作,然后是一些CPU密集型工作,最后将结果发送到不同的系统。我想并行运行所有这些事件,所以我真的不需要为计算和io工作安排好时间。我以为RxJava会在这方面帮助我,但也许我在努力完成第一步时,尝试使用了错误的工具
[1]这是我们从文件中得到的信息,除非我找错了。这真的是我们仅有的文档吗?我希望有类似反应堆的东西:https://projectreactor.io/docs/core/release/reference/
# 1 楼答案
将processTask()更改为类似的可流动流
因此,processTask()基本上也会返回一个可流动的数据流
这就是现在使用flatMap()的方式
希望这有帮助