有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java RxJava 2将事件并行化以执行,并产生副作用

我希望并行执行,而不在最后聚合事件。我是RxJava新手,正在尝试评估它是否适合我的需要

我知道如何执行正常的非并行执行。为了简单起见,让我们从以下内容开始:

taskReader.stream() // returns Flowable<Task>
    .subscribe(this::processTask) // sends results to another micro service

这是一种有序的阻塞方式。然而,我想将其并行化,并同时执行所有任务。文档基本上告诉你使用平面图,这是有道理的,但我不知道如何运行整个过程。让我们看看:

taskReader.stream() // returns Flowable<Task> 
    .flatMap(
        Flowable.just(task)
            .subscribeOn(Schedulers.computation())
            .map(this::processTask)
    )

这就是我所需要的,但这当然不会启动Flowable,因为它没有订阅任何内容[1]。我所有的任务都是不相关的,所以我不需要再次将它们聚合到顶级流中,而且我当然不在乎它们的顺序

你是怎么开始这样的流动的?我不想订阅顶级Flowable,因为在那里我不需要做任何其他事情

或者,文档会告诉您使用并行流:

taskReader.stream() // returns Flowable<Task> 
    .parallel()
    .runOn(Schedulers.computation())
    .map(this::processTask)
    .sequential();

再说一次,我不需要在结尾排序任何东西,因为我不在乎顺序。另外,我不想订阅这个,因为地图上的工作就是我所需要的

我真正想要的是:

taskReader.stream() // returns Flowable<Task> 
    .flatMap(
        Flowable.just(task)
            .subscribeOn(Schedulers.computation())
            .subscribe(this::processTask)
    )

但RxJava不是这样工作的

帮忙

一些背景:我从一个队列(特别是AWS SQS)接收事件,其中的事件彼此无关。对于每一个,我都需要执行一些I/O密集型工作,然后是一些CPU密集型工作,最后将结果发送到不同的系统。我想并行运行所有这些事件,所以我真的不需要为计算和io工作安排好时间。我以为RxJava会在这方面帮助我,但也许我在努力完成第一步时,尝试使用了错误的工具

[1]这是我们从文件中得到的信息,除非我找错了。这真的是我们仅有的文档吗?我希望有类似反应堆的东西:https://projectreactor.io/docs/core/release/reference/


共 (1) 个答案

  1. # 1 楼答案

    将processTask()更改为类似的可流动流

    public Flowable<ProcessTask> processTask() {
        return Flowable
            .create(
                e -> {
                    //Put the complete code inside this block which you have currently inside your processTask() method
                    //Whatever processed output you get pass it as shown below.
                    e.onNext(//Pass <ProcessTask> object);
                },
                BackpressureStrategy.BUFFER //Choose any BackpressureStrategy which suits your requirement
            )
            .subscribeOn(Schedulers.io());
    }
    

    因此,processTask()基本上也会返回一个可流动的数据流

    这就是现在使用flatMap()的方式

    taskReader
        .stream() // returns Flowable<Task>
        .flatMap(task -> processTask(task))
        .subscribe(
            result -> //Processed task, result from e.onNext(),
            error -> //Error
        );
    

    希望这有帮助