有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java从InputStream到并行流<T>

我得到一个包含多个元素的InputStream,它们被一个流以串行方式扫描、解析、迭代(与它们在InputStream中的顺序相同),然后保存在一个DB中。这个很好用

现在,我尝试以并行方式迭代流,使用Stream<T>.parallel(),这样当一个线程被阻塞时,其他线程仍然可以扫描输入流并持久化

然后,我尝试将结果Stream<MyElement>Stream<T>.parallel()并行。为了检查并行化是否有效,我在流中添加了一个map函数,该函数添加了一个随机延迟。我希望生成的元素以随机顺序打印

但结果并不是预期的结果。元素仍按文件顺序显示

有没有一种方法可以正确地并行迭代这个流

public class FromInputStreamToParallelStream {    

    public static Stream<MyElement> getStream(InputStream is) {
        try (var scanner = new Scanner(is)) {
            return scanner//
                    .useDelimiter("DELIMITER")
                    .tokens()
                    .parallel() 
                    .map(MyElementParser::parse);
        }
    }

    @Test
    public void test() throws IOException  {
        try (InputStream in = Files.newInputStream(Paths.get("my-file.xml"));) {
            getStream(in)
                    .map(FromInputStreamToParallelStream::sleepRandom) 
                    .forEach(System.out::println);
        }
    }

    private static MyElement sleepRandom(MyElement element) {
        var randomNumber = new Random().nextInt(10);
        System.out.println("wait. " + randomNumber);

        try {
            TimeUnit.SECONDS.sleep(randomNumber);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return element;
    }
}

我想我需要实现我自己的Spliterator<T>

提前谢谢


共 (0) 个答案