有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java为什么subscribeOn方法不切换上下文?

我的测试基于4.5.2. The subscribeOn Method(忽略缺少的new Thread(...)部分):

    @Test
    public void theSubscribeOnMethod() throws InterruptedException {
        Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);

        final Flux<String> flux = Flux
                .range(1, 2)
                .map(i -> 10 + i)
                .subscribeOn(s)
                .doOnSubscribe(sub -> System.out.println(
                        "[doOnSubscribe] " + Thread.currentThread().getName()))
                .map(i -> Thread.currentThread().getName() + ", value " + i);

        flux.subscribe(System.out::println);

        Thread.sleep(1000);
    }

它打印了这个(注意[doOnSubscribe]main线程):

[doOnSubscribe] main
parallel-scheduler-1, value 11
parallel-scheduler-1, value 12

仅仅在doOnSubscribe之后移动subscribeOn就产生了这个结果(在我看来是正确的):

[doOnSubscribe] parallel-scheduler-1
parallel-scheduler-1, value 11
parallel-scheduler-1, value 12

根据Flux.subscribeOnjavadoc:

Run subscribe, onSubscribe and request on a specified Scheduler's Scheduler.Worker. As such, placing this operator anywhere in the chain will also impact the execution context of onNext/onError/onComplete signals from the beginning of the chain up to the next occurrence of a publishOn.

因此,在本例中,我希望doOnSubscribe发生在parallel-scheduler线程上,而不是main,尽管我放置了subscribeOn

这是虫子吗?为什么会发生这种情况,我应该如何解决

PS:使用reactor-core:3.1.0-RELEASE


共 (1) 个答案

  1. # 1 楼答案

    这不是虫子。这是预期的行为。这是事件的顺序,乍一看可能令人困惑

    我稍微修改了你的例子:

    Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
    
    final Flux<String> flux = Flux
            .range(1, 2)
            .map(i -> 10 + i)
            .subscribeOn(s)
            .doOnNext(value -> System.out.println(
                    "[doOnNext] " + Thread.currentThread().getName() + " value: " + value))
            .doOnSubscribe(sub -> System.out.println(
                    "[doOnSubscribe] " + Thread.currentThread().getName()));
    
    flux.subscribe();
    
    Thread.sleep(1000);
    

    输出:

    [doOnSubscribe] main
    [doOnNext] parallel-scheduler-1 value: 11
    [doOnNext] parallel-scheduler-1 value: 12
    

    当你认为必须的时候,你会期望操作符是按照声明顺序被调用的,所以后面的操作符会被调用。函数式/反应式编程通常也是如此,但情况并非总是如此

    在上面的例子中doOnSubscribe实际上比管道中的其他任何东西都要先运行。是运营商先收到关于订阅的通知,然后将订阅转发给其上游运营商,后者将订阅转发给其上游运营商,以此类推

    所以事件发生的顺序是:

    1. flux.subscribe()订阅管道并触发主线程上的执行
    2. doOnSubscribe获取有关订阅的通知,并在主线程上触发提供的使用者,然后将订阅转发到其上游
    3. doOnNext获取有关订阅的通知,但它只对onNext事件感兴趣,因此除了将订阅转发到其上游(在主线程上)之外,它还没有做任何事情(!)
    4. subscribeOn(s)-发生上下文切换,之后的每个事件都发生在提供的调度程序的上下文中
    5. 订阅被转发给上游操作员:首先是map,然后是range,两者都在并行线程上
    6. range是源代码,因此它在并行线程上发出第一项:onNext(1)
    7. maponNext事件上触发,进行转换,并将转换后的元素作为并行线程上的onNext事件发送
    8. subscribeOn除了转发onNext事件之外,不做任何事情
    9. doOnNext消费程序被触发,因为它接收到一个仍然在并行线程上的onNext事件
    10. doOnSubscribe忽略onNext事件,只是将事件转发给下一个操作符
    11. onNext(2)项重复6-10个事件
    12. 流完成

    TLDR:订阅事件从底部运算符传播到顶部运算符(doOnSubscribesusbcribeOn在此传播过程中按此顺序触发),然后实际数据从顶部源运算符(range)反向流向底部

    由于doOnSubscribesusbcribeOn之前运行,它仍将在主线程上运行