java为什么subscribeOn方法不切换上下文?
我的测试基于4.5.2. The subscribeOn Method(忽略缺少的new Thread(...)
部分):
@Test
public void theSubscribeOnMethod() throws InterruptedException {
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux
.range(1, 2)
.map(i -> 10 + i)
.subscribeOn(s)
.doOnSubscribe(sub -> System.out.println(
"[doOnSubscribe] " + Thread.currentThread().getName()))
.map(i -> Thread.currentThread().getName() + ", value " + i);
flux.subscribe(System.out::println);
Thread.sleep(1000);
}
它打印了这个(注意[doOnSubscribe]
的main
线程):
[doOnSubscribe] main
parallel-scheduler-1, value 11
parallel-scheduler-1, value 12
仅仅在doOnSubscribe
之后移动subscribeOn
就产生了这个结果(在我看来是正确的):
[doOnSubscribe] parallel-scheduler-1
parallel-scheduler-1, value 11
parallel-scheduler-1, value 12
根据Flux.subscribeOnjavadoc:
Run subscribe, onSubscribe and request on a specified Scheduler's Scheduler.Worker. As such, placing this operator anywhere in the chain will also impact the execution context of onNext/onError/onComplete signals from the beginning of the chain up to the next occurrence of a publishOn.
因此,在本例中,我希望doOnSubscribe
发生在parallel-scheduler
线程上,而不是main
,尽管我放置了subscribeOn
这是虫子吗?为什么会发生这种情况,我应该如何解决
PS:使用reactor-core:3.1.0-RELEASE
# 1 楼答案
这不是虫子。这是预期的行为。这是事件的顺序,乍一看可能令人困惑
我稍微修改了你的例子:
输出:
当你认为必须的时候,你会期望操作符是按照声明顺序被调用的,所以后面的操作符会被调用。函数式/反应式编程通常也是如此,但情况并非总是如此
在上面的例子中
doOnSubscribe
实际上比管道中的其他任何东西都要先运行。是运营商先收到关于订阅的通知,然后将订阅转发给其上游运营商,后者将订阅转发给其上游运营商,以此类推所以事件发生的顺序是:
flux.subscribe()
订阅管道并触发主线程上的执行doOnSubscribe
获取有关订阅的通知,并在主线程上触发提供的使用者,然后将订阅转发到其上游doOnNext
获取有关订阅的通知,但它只对onNext
事件感兴趣,因此除了将订阅转发到其上游(在主线程上)之外,它还没有做任何事情(!)李>subscribeOn(s)
-发生上下文切换,之后的每个事件都发生在提供的调度程序的上下文中map
,然后是range
,两者都在并行线程上range
是源代码,因此它在并行线程上发出第一项:onNext(1)
map
在onNext
事件上触发,进行转换,并将转换后的元素作为并行线程上的onNext
事件发送subscribeOn
除了转发onNext
事件之外,不做任何事情doOnNext
消费程序被触发,因为它接收到一个仍然在并行线程上的onNext
事件doOnSubscribe
忽略onNext
事件,只是将事件转发给下一个操作符onNext(2)
项重复6-10个事件TLDR:订阅事件从底部运算符传播到顶部运算符(
doOnSubscribe
和susbcribeOn
在此传播过程中按此顺序触发),然后实际数据从顶部源运算符(range
)反向流向底部由于
doOnSubscribe
在susbcribeOn
之前运行,它仍将在主线程上运行