java ForkJoinPool为什么程序抛出OutOfMemoryError?
我想在Java8中试用ForkJoinPool,所以我编写了一个小程序,用于搜索给定目录中名称包含特定关键字的所有文件
节目:
public class DirectoryService {
public static void main(String[] args) {
FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR");
ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
List<String> files = pool.invoke(task);
pool.shutdown();
System.out.println("Total no of files with hello" + files.size());
}
}
class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
private String path;
public FileSearchRecursiveTask(String path) {
this.path = path;
}
@Override
protected List<String> compute() {
File mainDirectory = new File(path);
List<String> filetedFileList = new ArrayList<>();
List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
if(mainDirectory.isDirectory()) {
System.out.println(Thread.currentThread() + " - Directory is " + mainDirectory.getName());
if(mainDirectory.canRead()) {
File[] fileList = mainDirectory.listFiles();
for(File file : fileList) {
System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath());
if(file.isDirectory()) {
FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
recursiveTasks.add(task);
task.fork();
} else {
if (file.getName().contains("hello")) {
System.out.println(file.getName());
filetedFileList.add(file.getName());
}
}
}
}
for(FileSearchRecursiveTask task : recursiveTasks) {
filetedFileList.addAll(task.join());
}
}
return filetedFileList;
}
}
当目录没有太多的子目录和文件时,这个程序运行良好,但如果它真的很大,那么它会抛出OutOfMemoryError
我的理解是,最大线程数(包括补偿线程)是有界的,那么为什么会出现这种错误呢?我的程序有什么遗漏吗
Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at java.util.concurrent.ForkJoinPool.createWorker(ForkJoinPool.java:1486)
at java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:2020)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2057)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.tryRemoveAndExec(ForkJoinPool.java:1107)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2046)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
# 1 楼答案
你不应该把新任务做得面目全非。基本上,只要有可能另一个工作线程可以选择分叉的工作并在本地进行评估,就应该分叉。然后,一旦你完成了一项任务,就不要马上调用
join()
。虽然底层框架将启动补偿线程,以确保您的作业将继续进行,而不是让所有线程在等待子任务时被阻塞,但这将创建大量可能超出系统能力的线程以下是您的代码的修订版本:
进行处理的方法已被分解为一个接收目录作为参数的方法,因此我们能够在本地将其用于不一定与
FileSearchRecursiveTask
实例关联的任意目录然后,该方法使用^{} 来确定本地排队的任务数量,这些任务没有被其他工作线程拾取。确保有一些因素有助于平衡工作。但如果这个数字超过阈值,处理将在本地完成,而不会产生更多作业
在本地处理之后,它对任务进行迭代,并使用^{} 来识别没有被其他工作线程窃取的作业,并在本地处理它们。从最年轻的工作开始,向后迭代可以增加找到一些工作的机会
只有在这之后,它才会处理所有子作业,这些子作业现在或者已经完成,或者正在由另一个工作线程处理
注意,我更改了启动代码以使用默认池。它使用“CPU内核数”减去一个工作线程,再加上启动线程,即本例中的
main
线程# 2 楼答案
只需要一点小小的改变。 您需要为newWorkStealingPool指定如下并行性:
根据其文件:
根据附带的Java Visual VM屏幕截图,这种并行性允许程序在指定的内存内工作,并且永远不会耗尽内存。
还有一件事(不确定它是否会产生任何效果):
更改调用fork和将任务添加到列表的顺序。也就是说,改变
到