有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java ForkJoinPool为什么程序抛出OutOfMemoryError?

我想在Java8中试用ForkJoinPool,所以我编写了一个小程序,用于搜索给定目录中名称包含特定关键字的所有文件

节目

public class DirectoryService {

    public static void main(String[] args) {
        FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR");
        ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
        List<String> files = pool.invoke(task);
        pool.shutdown();
        System.out.println("Total  no of files with hello" + files.size());
    }

}

    class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
        private String path;
        public FileSearchRecursiveTask(String path) {
            this.path = path;
        }

        @Override
        protected List<String> compute() {
            File mainDirectory = new File(path);
            List<String> filetedFileList = new ArrayList<>();
            List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
            if(mainDirectory.isDirectory()) {
                System.out.println(Thread.currentThread() + " - Directory is " + mainDirectory.getName());
                if(mainDirectory.canRead()) {
                    File[] fileList = mainDirectory.listFiles();
                    for(File file : fileList) {
                        System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath());
                        if(file.isDirectory()) {
                            FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
                            recursiveTasks.add(task);
                            task.fork();
                        } else {
                            if (file.getName().contains("hello")) {
                                System.out.println(file.getName());
                                filetedFileList.add(file.getName());
                            }
                        }
                    }
                }

                for(FileSearchRecursiveTask task : recursiveTasks) {
                  filetedFileList.addAll(task.join());
                }

        }
        return filetedFileList;

    }
}

当目录没有太多的子目录和文件时,这个程序运行良好,但如果它真的很大,那么它会抛出OutOfMemoryError

我的理解是,最大线程数(包括补偿线程)是有界的,那么为什么会出现这种错误呢?我的程序有什么遗漏吗

Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at java.util.concurrent.ForkJoinPool.createWorker(ForkJoinPool.java:1486)
at java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:2020)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2057)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.tryRemoveAndExec(ForkJoinPool.java:1107)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2046)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)   

共 (2) 个答案

  1. # 1 楼答案

    你不应该把新任务做得面目全非。基本上,只要有可能另一个工作线程可以选择分叉的工作并在本地进行评估,就应该分叉。然后,一旦你完成了一项任务,就不要马上调用join()。虽然底层框架将启动补偿线程,以确保您的作业将继续进行,而不是让所有线程在等待子任务时被阻塞,但这将创建大量可能超出系统能力的线程

    以下是您的代码的修订版本:

    public class DirectoryService {
    
        public static void main(String[] args) {
            FileSearchRecursiveTask task = new FileSearchRecursiveTask(new File("./DIR"));
            List<String> files = task.invoke();
            System.out.println("Total no of files with hello " + files.size());
        }
    
    }
    
    class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
        private static final int TARGET_SURPLUS = 3;
        private File path;
        public FileSearchRecursiveTask(File file) {
            this.path = file;
        }
    
        @Override
        protected List<String> compute() {
            File directory = path;
            if(directory.isDirectory() && directory.canRead()) {
                System.out.println(Thread.currentThread() + " - Directory is " + directory.getName());
                return scan(directory);
            }
            return Collections.emptyList();
        }
    
        private List<String> scan(File directory)
        {
            File[] fileList = directory.listFiles();
            if(fileList == null || fileList.length == 0) return Collections.emptyList();
            List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
            List<String> filteredFileList = new ArrayList<>();
            for(File file: fileList) {
                System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath());
                if(file.isDirectory())
                {
                    if(getSurplusQueuedTaskCount() < TARGET_SURPLUS)
                    {
                        FileSearchRecursiveTask task = new FileSearchRecursiveTask(file);
                        recursiveTasks.add(task);
                        task.fork();
                    }
                    else filteredFileList.addAll(scan(file));
                }
                else if(file.getName().contains("hello")) {
                    filteredFileList.add(file.getAbsolutePath());
                }
            }
    
            for(int ix = recursiveTasks.size() - 1; ix >= 0; ix ) {
                FileSearchRecursiveTask task = recursiveTasks.get(ix);
                if(task.tryUnfork()) task.complete(scan(task.path));
            }
    
            for(FileSearchRecursiveTask task: recursiveTasks) {
                filteredFileList.addAll(task.join());
            }
            return filteredFileList;
        }
    }
    

    进行处理的方法已被分解为一个接收目录作为参数的方法,因此我们能够在本地将其用于不一定与FileSearchRecursiveTask实例关联的任意目录

    然后,该方法使用^{}来确定本地排队的任务数量,这些任务没有被其他工作线程拾取。确保有一些因素有助于平衡工作。但如果这个数字超过阈值,处理将在本地完成,而不会产生更多作业

    在本地处理之后,它对任务进行迭代,并使用^{}来识别没有被其他工作线程窃取的作业,并在本地处理它们。从最年轻的工作开始,向后迭代可以增加找到一些工作的机会

    只有在这之后,它才会处理所有子作业,这些子作业现在或者已经完成,或者正在由另一个工作线程处理

    注意,我更改了启动代码以使用默认池。它使用“CPU内核数”减去一个工作线程,再加上启动线程,即本例中的main线程

  2. # 2 楼答案

    只需要一点小小的改变。 您需要为newWorkStealingPool指定如下并行性:

    ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool(5);
    

    根据其文件:

    newWorkStealingPool(int parallelism) -> Creates a thread pool that maintains enough threads to support the given parallelism level, and may use multiple queues to reduce contention. The parallelism level corresponds to the maximum number of threads actively engaged in, or available to engage in, task processing. The actual number of threads may grow and shrink dynamically. A work-stealing pool makes no guarantees about the order in which submitted tasks are executed.

    根据附带的Java Visual VM屏幕截图,这种并行性允许程序在指定的内存内工作,并且永远不会耗尽内存。 enter image description here

    还有一件事(不确定它是否会产生任何效果):

    更改调用fork和将任务添加到列表的顺序。也就是说,改变

    FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
    recursiveTasks.add(task);
    task.fork();
    

    FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
    task.fork();
    recursiveTasks.add(task);