有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

多线程自定义fork-join池正在java程序中与公共池一起使用

我已经创建了下面的程序,我正在尝试将自定义fork-join池传递到其中,我不想使用公共连接池,但我仍然看到公共池正在使用,即使在传递fork-join池之后。请解释为什么会发生这种情况

package com.example.javanewfeatures;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ForkJoinPoolExample {

    public static void main(String args[]) throws InterruptedException {

        List<Integer> numbers = buildIntRange();

        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        Thread t1 = new Thread(() -> forkJoinPool.submit(() -> {
            numbers.parallelStream().forEach(n -> {
                try {
                    Thread.sleep(5);
                    System.out.println("Loop 1 : " + Thread.currentThread());
                } catch (InterruptedException e) {

                }
            });
        }).invoke());

        ForkJoinPool forkJoinPool2 = new ForkJoinPool(4);
        Thread t2 = new Thread(() -> forkJoinPool2.submit(() -> {
            numbers.parallelStream().forEach(n -> {
                try {
                    Thread.sleep(5);
                    System.out.println("Loop 2 : " + Thread.currentThread());
                } catch (InterruptedException e) {

                }
            });
        }).invoke());

        t1.start();
        t2.start();
        t1.join();
        t2.join();

    }

    private static List<Integer> buildIntRange() {
        return IntStream.range(0, 10).boxed().collect(Collectors.toUnmodifiableList());
    }

}

共 (1) 个答案

  1. # 1 楼答案

    ...but still I see that common pool is being used even after passing the fork join pool

    当然,当您创建ForkJoinPool的实例时,不会使用公共池。您可以打印以下语句,以确保情况并非如此

    System.out.printf("Common Pool:%s\n", ForkJoinPool.commonPool());
    System.out.printf("Custom Pool:%s\n", new ForkJoinPool(4));
    

    但是在您的例子中,公共池不是由您的任务使用的,而是由Streams用于并行计算

    现在,如果您想让streams使用自定义池,那么您可以引用这个post-Parallel streams in custom pool

    在您当前的实现中,这就是行为

    • 您正在使用ForkJoinPool.submit()将任务提交到池中。这将确保任务在自定义池中执行
    • 但是在你调用的返回任务ForkJoinTask.invoke()上发布。这一次,任务将在非FJ线程的Thread t1上触发,因此任务将提交到公共池。请参阅下面的ForkJoinTask.doInvoke()源代码:
      private int doInvoke() {
          int s; Thread t; ForkJoinWorkerThread wt;
          return (s = doExec()) < 0 ? s :
              ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
              (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) :
              externalAwaitDone();
      }
    
      private int externalAwaitDone() {
          int s;
          ForkJoinPool cp = ForkJoinPool.common;
      //...
      }
    
    • 如果观察输出,将得到重复的结果。通过提交执行将使用自定义池,通过调用执行将使用公共池。
      ...
      Loop 1 : Thread[ForkJoinPool-2-worker-0,5,main]
      Loop 1 : Thread[ForkJoinPool.commonPool-worker-3,5,main]
      ...
      

    如上文所述,要更正您的实现,您可以进行以下更改

    Thread t2 = new Thread(() -> {
        try {
            forkJoinPool2.submit(() -> {
                numbers.stream().forEach(n -> {
                    try {
                        Thread.sleep(5);
                        System.out.println("Loop 2 : " + Thread.currentThread());
                    } catch (InterruptedException e) {
    
                    }
                });
            }).get(); /*change invoke to get and catch the exception*/
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    });
    

    输出:

    Loop 2 : Thread[ForkJoinPool-2-worker-1,5,main]
    Loop 1 : Thread[ForkJoinPool-1-worker-1,5,main]
    Loop 1 : Thread[ForkJoinPool-1-worker-3,5,main]
    Loop 1 : Thread[ForkJoinPool-1-worker-2,5,main]
    Loop 1 : Thread[ForkJoinPool-1-worker-0,5,main]
    Loop 2 : Thread[ForkJoinPool-2-worker-1,5,main]
    Loop 1 : Thread[ForkJoinPool-1-worker-1,5,main]
    Loop 1 : Thread[ForkJoinPool-1-worker-3,5,main]
    Loop 1 : Thread[ForkJoinPool-1-worker-2,5,main]
    Loop 1 : Thread[ForkJoinPool-1-worker-0,5,main]
    Loop 2 : Thread[ForkJoinPool-2-worker-1,5,main]
    Loop 1 : Thread[ForkJoinPool-1-worker-1,5,main]
    Loop 1 : Thread[ForkJoinPool-1-worker-3,5,main]
    Loop 2 : Thread[ForkJoinPool-2-worker-1,5,main]
    Loop 2 : Thread[ForkJoinPool-2-worker-1,5,main]
    Loop 2 : Thread[ForkJoinPool-2-worker-1,5,main]
    Loop 2 : Thread[ForkJoinPool-2-worker-1,5,main]
    Loop 2 : Thread[ForkJoinPool-2-worker-1,5,main]
    Loop 2 : Thread[ForkJoinPool-2-worker-1,5,main]
    Loop 2 : Thread[ForkJoinPool-2-worker-1,5,main]