有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java RxJava:如何在多个线程中处理事件

例如,我有以下使用RxJava库的代码:

public class MultithreadingExample {
public static void main(String[] args) throws InterruptedException {
    Observable.from(Lists.newArrayList(1, 2, 3, 4, 5))
            .observeOn(Schedulers.computation())
            .map(numberToString())
            .subscribe(printResult());
    Thread.sleep(10000);
}

private static Func1<Integer, String> numberToString() {
    return number -> {
        System.out.println("Operator thread: " + Thread.currentThread().getName());
        return String.valueOf(number);
    };
}

private static Action1<String> printResult() {
    return result -> {
        System.out.println("Subscriber thread: " + Thread.currentThread().getName());
        System.out.println("Result: " + result);
    };
}

}

我希望观察者中的事件由多个线程处理,例如,项目“1”由线程-1处理,项目“2”由线程-2处理,等等

使用RxJava的最佳方式是什么


共 (1) 个答案

  1. # 1 楼答案

    可以使用flatMap()运算符

    Observable.from(Lists.newArrayList(1, 2, 3, 4, 5))
            .flatMap( number -> Observable.defer( numberToString() )
                                  .subscribeOn( Schedulers.computation() ) )
            .observeOn(Schedulers.computation())
            .map(numberToString())
            .subscribe(printResult());
    

    flatMap()操作符将订阅(可能是新的)线程上的新可观察对象,将结果合并回执行最终observeOn()的线程