有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

多线程在Java中正确使用ForkJoinPool submit和join

我最近研究了一个外部合并排序算法(External Sorting)的实现,我的实现需要使用多线程方法

我尝试使用ForkJoinPool,而不是使用旧的Java实现,比如Thread和ExecutorService。该算法的第一步需要读取一个文件,每个x行收集并发送,以进行排序并写入文件。此操作(排序和保存)可以在主线程读取下一批数据时在单独的线程中完成。我已经写了一个方法来实现这一点(见下文)

我担心的是,实际的并行工作不是在我使用ForkJoinPool.commonPool().submit(()->SortAndWriteToFile(lines, fileName))时开始的,而是在循环完成后调用task.join()时才开始的。这意味着,在一个足够大的循环中,我将整理要运行的任务,但没有时间运行它们。当我使用invoke而不是submit时,似乎我无法控制join将在哪里,也无法保证在继续之前完成所有工作

有没有更正确的方法来实现这一点

我的代码如下。列出了该方法和两种实用方法。我希望这不会太长

protected int generateSortedFiles (String originalFileName, String destinationFilePrefix) {

    //Number of accumulated sorted blocks of size blockSize
    int blockCount = 0;

    //hold bufferSize number of lines from the file
    List<String> bufferLines = new ArrayList<String>();

    List<ForkJoinTask<?>> taskList = new ArrayList<ForkJoinTask<?>>();

    //Open file to read
    try (Stream<String> fileStream = Files.lines(Paths.get(originalFileName))) {

        //Iterate over BufferSize lines to add them to list.
        Iterator<String> lineItr = fileStream.iterator();

        while(lineItr.hasNext()) {

            //Add bufferSize lines to List
            for (int i=0;i<bufferSize;i++) {
                if (lineItr.hasNext()) {
                    bufferLines.add(lineItr.next());
                }
            }

            //submit the task to sort and write to file in a separate thread
            String fileName= destinationFilePrefix+blockCount+".csv";
            List<String> lines = Collections.unmodifiableList(bufferLines);
            taskList.add(ForkJoinPool.commonPool().submit(
                    ()->SortAndWriteToFile(lines, fileName)));

            blockCount++;
            bufferLines = new ArrayList<String>();
        }
    } catch (IOException e) {
        System.out.println("read from file " +originalFileName + "has failed due to "+e);
    } catch (ArrayIndexOutOfBoundsException e) {
        System.out.println("the index prodived was not available in the file "
                +originalFileName+" and the error is "+e);
    }
    flushParallelTaskList(taskList);

    return blockCount;
}

/**
 * This method takes lines, sorts them and writes them to file
 * @param lines the lines to be sorted
 * @param fileName the filename to write them to
 */
private void SortAndWriteToFile(List<String> lines, String fileName) {
    //Sort lines
    lines = lines.stream()
            .parallel()
            .sorted((e1,e2) -> e1.split(",")[indexOfKey].compareTo(e2.split(",")[indexOfKey]))
            .collect(Collectors.toList());

    //write the sorted block of lines to the destination file.      
    writeBuffer(lines, fileName);

}

/**
 * Wait until all the threads finish, clear the list
 * @param writeList
 */
private void flushParallelTaskList (List<ForkJoinTask<?>> writeList) {
    for (ForkJoinTask<?> task:writeList) {
        task.join();
    }
    writeList.clear();

}

共 (0) 个答案