有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java不支持流。parallel()是否更新拆分器的特性?

这个问题是基于对这个问题的回答What is the difference between Stream.of and IntStream.range?

由于IntStream.range生成一个已经排序的流,下面代码的输出将只生成作为0的输出:

IntStream.range(0, 4)
         .peek(e -> System.out.println(e))
         .sorted()
         .findFirst();

此外,拆分器将具有SORTED特性。下面的代码返回true

System.out.println(
    IntStream.range(0, 4)
             .spliterator()
             .hasCharacteristics(Spliterator.SORTED)
);

现在,如果我在第一个代码中引入一个parallel(),那么正如预期的那样,输出将包含从03的所有4个数字,但顺序是随机的,因为流由于parallel()而不再排序

IntStream.range(0, 4)
         .parallel()
         .peek(e -> System.out.println(e))
         .sorted()
         .findFirst();

这将以任何顺序产生如下结果:

2
0
1
3

因此,我希望SORTED属性由于parallel()而被删除。但是,下面的代码也返回true

System.out.println(
    IntStream.range(0, 4)
             .parallel()
             .spliterator()
             .hasCharacteristics(Spliterator.SORTED)
);

为什么parallel()不改变SORTED属性?既然这四个数字都是打印出来的,即使SORTED属性仍然存在,Java如何意识到流没有被排序


共 (2) 个答案

  1. # 1 楼答案

    如何做到这一点在很大程度上是一个实现细节。您必须深入挖掘源代码才能真正了解原因。基本上,并行和顺序管道的处理方式不同。看看^{},它检查isParallel(),然后根据管道是否并行执行不同的操作

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    

    如果再看一下^{},您会发现它覆盖了两个方法:

    @Override
    public Sink<Integer> opWrapSink(int flags, Sink sink) {
        Objects.requireNonNull(sink);
    
        if (StreamOpFlag.SORTED.isKnown(flags))
            return sink;
        else if (StreamOpFlag.SIZED.isKnown(flags))
            return new SizedIntSortingSink(sink);
        else
            return new IntSortingSink(sink);
    }
    
    @Override
    public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
                                                   Spliterator<P_IN> spliterator,
                                                   IntFunction<Integer[]> generator) {
        if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
            return helper.evaluate(spliterator, false, generator);
        }
        else {
            Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);
    
            int[] content = n.asPrimitiveArray();
            Arrays.parallelSort(content);
    
            return Nodes.node(content);
        }
    }
    

    opWrapSink如果是顺序管道,则最终将被调用;如果是并行流,则opEvaluateParallel(顾名思义)将被调用。注意opWrapSink如何在管道已经排序的情况下不对给定的接收器执行任何操作(只返回未更改的数据),但是opEvaluateParallel始终对拆分器求值

    还要注意的是,平行度和排序度并不是相互排斥的。您可以拥有具有这些特征的任意组合的流

    “排序”是Spliterator的一个特征。从技术上讲,它不是Stream的特征(就像“并行”一样)。当然,parallel可以用一个全新的拆分器(从原始拆分器获取元素)创建一个具有全新特性的流,但既然可以重用相同的拆分器,为什么要这样做呢?我想在任何情况下,您都必须以不同的方式处理并行流和顺序流

  2. # 2 楼答案

    考虑到ForkJoinPool用于并行流,并且它是基于work stealing工作的,因此您需要后退一步,考虑如何解决这样一个问题。如果您也知道Spliterator是如何工作的,那将非常有帮助。一些细节here

    您有一个特定的流,您将它“拆分”(非常简化)为更小的片段,并将所有这些片段交给ForkJoinPool执行。所有这些零件都是通过单独的螺纹独立加工的。因为我们在这里讨论的是线程,显然没有事件序列,事情是随机发生的(这就是为什么您会看到随机顺序输出)

    如果流保留了顺序,那么终端操作也应该保留它。因此,当中间操作以任何顺序执行时,您的终端操作(如果到该点的流是有序的)将以有序的方式处理元素。简而言之:

    System.out.println(
        IntStream.of(1,2,3)
                 .parallel()
                 .map(x -> {System.out.println(x * 2); return x * 2;})
                 .boxed()
                 .collect(Collectors.toList()));
    

    map将以未知的顺序处理元素(ForkJoinPool和线程,记住这一点),但是collect将以“从左到右”的顺序接收元素


    现在,如果我们将其外推到您的示例中:当您调用parallel时,流被分割成小块并进行处理。例如,看看这是如何分割的(一次)

    Spliterator<Integer> spliterator =
    IntStream.of(5, 4, 3, 2, 1, 5, 6, 7, 8)
             .parallel()
             .boxed()
             .sorted()
             .spliterator()
             .trySplit(); // trySplit is invoked internally on parallel
    
    spliterator.forEachRemaining(System.out::println);
    

    在我的机器上它打印1,2,3,4。这意味着内部实现将流分成两个Spliterator部分:leftrightleft[1, 2, 3, 4],右边有[5, 6, 7, 8]。但事实并非如此,这些Spliterator可以进一步拆分。例如:

    Spliterator<Integer> spliterator =
    IntStream.of(5, 4, 3, 2, 1, 5, 6, 7, 8)
             .parallel()
             .boxed()
             .sorted()
             .spliterator()
             .trySplit()
             .trySplit()
             .trySplit();
    
    spliterator.forEachRemaining(System.out::println);
    

    如果您再次尝试调用trySplit,您将得到一个null-意思是,就是这样,我不能再拆分了

    因此,您的流:IntStream.range(0, 4)将被拆分为4个拆分器。所有的工作都是由一根线单独完成的。如果您的第一个线程知道它当前使用的Spliterator是“最左边的一个”,那么就这样!其余线程甚至不需要开始工作-结果是已知的

    另一方面,可能是具有“最左侧”元素的这个Spliterator只在最后启动。所以前三个可能已经完成了(因此在您的示例中调用了peek),但它们没有“产生”所需的结果

    事实上,情况就是这样。您不需要理解代码,但是流和方法名称应该是显而易见的