Flux doOnEach中的java同步
让我们看看这个例子:
class ReactiveApp {
static volatile int count = 0;
static final Lock lock = new ReentrantLock();
static final CountDownLatch latch = new CountDownLatch(2);
public static void main(String[] args) throws Exception {
Flux.range(0, 100_000)
.parallel()
.runOn(Schedulers.single())
.doOnEach(i -> {
lock.lock();
count++;
lock.unlock();
})
.doOnComplete(() -> latch.countDown())
.subscribe();
Flux.range(0, 100_000)
.parallel()
.runOn(Schedulers.single())
.doOnEach(i -> {
lock.lock();
count++;
lock.unlock();
})
.doOnComplete(() -> latch.countDown())
.subscribe();
latch.await();
System.out.println(count);
}
}
上面的代码以同步方式在每个元素上增加count
。我希望System.out.println(count);
打印200000
。我的代码怎么了
# 1 楼答案
因为你没有在问题中具体说明问题,我假设这个数字比你预期的要小得多
这是由于您在并行通量上使用onComplete
请注意,它表示“轨道”何时完成,而不是所有轨道何时完成
这意味着您的闩锁正在倒计时一个“轨道”完成的瞬间
对于较小的数字,您可能会看到它是总的/num cpu内核数(或者如果显式设置,则为rails数)
由于多条轨道以略微不同的速率运行,数字越大,其随机性就越大
无论如何,一个解决方案是在使用
sequential()
调用onComplete之前将parallelFlux合并在一起,因此只有当所有rails都完成时才调用runnable还有一些关于上述代码的其他注释
doOnNext
用于防止事件完成时计数也增加count
我已更改为原子整数并调用incrementAndGet()
,以提供更好的线程安全性