有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

Flux doOnEach中的java同步

让我们看看这个例子:

class ReactiveApp {

    static volatile int count = 0;
    static final Lock lock = new ReentrantLock();
    static final CountDownLatch latch = new CountDownLatch(2);

    public static void main(String[] args) throws Exception {

        Flux.range(0, 100_000)
                .parallel()
                .runOn(Schedulers.single())
                .doOnEach(i -> {
                    lock.lock();
                    count++;
                    lock.unlock();
                })
                .doOnComplete(() -> latch.countDown())
                .subscribe();

        Flux.range(0, 100_000)
                .parallel()
                .runOn(Schedulers.single())
                .doOnEach(i -> {
                    lock.lock();
                    count++;
                    lock.unlock();
                })
                .doOnComplete(() -> latch.countDown())
                .subscribe();

        latch.await();

        System.out.println(count);

    }
}

上面的代码以同步方式在每个元素上增加count。我希望System.out.println(count);打印200000。我的代码怎么了


共 (1) 个答案

  1. # 1 楼答案

    因为你没有在问题中具体说明问题,我假设这个数字比你预期的要小得多

    这是由于您在并行通量上使用onComplete

    Run the specified runnable when a 'rail' completes.

    请注意,它表示“轨道”何时完成,而不是所有轨道何时完成

    这意味着您的闩锁正在倒计时一个“轨道”完成的瞬间

    对于较小的数字,您可能会看到它是总的/num cpu内核数(或者如果显式设置,则为rails数)

    由于多条轨道以略微不同的速率运行,数字越大,其随机性就越大

    无论如何,一个解决方案是在使用sequential()调用onComplete之前将parallelFlux合并在一起,因此只有当所有rails都完成时才调用runnable

        Flux.range(0, 100_000)
                .parallel()
                .runOn(Schedulers.single())
                .doOnNext(i -> {
                    lock.lock();
                    count.incrementAndGet();
                    lock.unlock();
                })
                .sequential()
                .doOnComplete(latch::countDown)
                .subscribe();
    

    还有一些关于上述代码的其他注释

    doOnNext用于防止事件完成时计数也增加
    count我已更改为原子整数并调用incrementAndGet(),以提供更好的线程安全性