有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

多线程如何减少Java parallelStream中的#个线程?

我有一个带有parallelStream()的测试代码,它向服务器机器发送请求

Report report = 
    requestsList.parallelStream()
                .map(request -> freshResultsGenerator.getResponse(request, e2EResultLongBL))
                .map(response -> resultsComparer.compareToBl(response, e2EResultLongBL,
                            astarHistogramsArrayBl, latencyHistogramBl))
                .reduce(null,
                        (sumReport, compare2) ->
                        {
                            if (sumReport == null) {
                                sumReport = new Report();
                            }
                            sumReport.add(compare2);
                            return sumReport;
                        },
                        (report1, report2) ->
                        {
                            Report report3 = new Report();
                            report3.add(report1);
                            report3.add(report2);
                            return report3;
                        });

这台机器的负载太大,很快就会返回HTTP 404错误

有两件事我在谷歌上找不到答案:

  1. 如果没有定制,parallelStream的默认线程数是多少 设定
  2. 如何将辅助线程的数量设置为(比如)4

共 (3) 个答案

  1. # 1 楼答案

    流API使用^{}执行并发任务。引用其文件:

    The common pool is by default constructed with default parameters, but these may be controlled by setting three system properties:

    • java.util.concurrent.ForkJoinPool.common.parallelism - the parallelism level, a non-negative integer
      ...

    还有

    a ForkJoinPool may be constructed with a given target parallelism level; by default, equal to the number of available processors.

    因此,要自定义线程数,可以将系统属性java.util.concurrent.ForkJoinPool.common.parallelism设置为所需的值:

    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "4")
    

    将工作线程数设置为4。默认情况下,线程数将等于您拥有的处理器数

  2. # 2 楼答案

    ParallelStream使用ForkJoinPool.commonPool(),它被初始化为您的内核数-1

    可以按照here所述将自己的ForkJoin Executor传递给parallelStream,但是执行器必须是ForkJoin,这对于IO绑定的任务来说不是最好的

  3. # 3 楼答案

    您可以在自定义ForkJoinPool中启动管道,并使用它来获得结果:

    ForkJoinPool fjp = new ForkJoinPool(2);
    System.out.println("My pool: " + fjp);
    String result = CompletableFuture.supplyAsync(
        () -> Stream.of("a", "b", "c").parallel()
        .peek(x -> System.out.println(
            ((ForkJoinWorkerThread) Thread.currentThread()).getPool()))
        .collect(Collectors.joining()), fjp).join();
    System.out.println(result);
    

    我的StreamEx库添加了一个语法糖方法.parallel(fjp),以简化这一过程:

    String result = StreamEx.of("a", "b", "c")
        .parallel(fjp)
        .peek(x -> System.out.println(
            ((ForkJoinWorkerThread) Thread.currentThread()).getPool()))
        .collect(Collectors.joining());