有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

带有前置和后置操作的Java Streams forEach

当使用Stream.forEach()时,我在想,当流不是空的时候,是否不能添加要执行的预操作和后操作。例如,当打印一个列表时,当流为空时,可以预加某些内容或写入其他内容

现在我想出了类似的办法

private static <T> void forEach(Stream<T> stream, Consumer<? super T> action,
    Runnable preAction, Runnable postAction, Runnable ifEmpty) {
    AtomicBoolean hasElements = new AtomicBoolean(false);
    Consumer<T> preActionConsumer = x -> {
        if (hasElements.compareAndSet(false, true)) {
            preAction.run();
        }
    };
    stream.forEach(preActionConsumer.andThen(action));
    if (hasElements.get()) {
        postAction.run();
    } else {
        ifEmpty.run();
    }
}

对于顺序流,这应该有效,不是吗? 这个方法正确吗?有这样的方法是“好主意”还是有任何警告

这对并行流不起作用,因为preAction可能比执行action的另一个线程慢,但在不使用同步或其他并发UTIL的情况下正确实现它可能并不容易,因为同步或其他并发UTIL不符合并行流的目的

编辑:添加用例。使用正则表达式从文件中读取搜索整数并将其写入另一个文件。使用这种方法,我不必在内存中创建字符串,然后将其写入某个文件。(显然,对于我真正的任务,我使用的是更复杂的正则表达式。)

public static void main(String[] args) throws IOException {
    Stream<String> lines = Files.lines(Paths.get("foo.txt"));

    Pattern findInts = Pattern.compile("(\\d+)");
    Path barFile = Paths.get("bar.txt");
    try (BufferedWriter writer = Files.newBufferedWriter(barFile , StandardOpenOption.CREATE_NEW)) {
        lines.flatMap(x -> findInts.matcher(x).results())
                .forEach(x-> convertCheckedIOException(() ->  {
                            writer.write(x.group(1));
                            writer.newLine();
                        })
                );
    }
}

public static void convertCheckedIOException(Run r) {
    try {
        r.run();
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}

interface Run {
    void run() throws IOException;
}

共 (2) 个答案

  1. # 1 楼答案

    使用适合你工作的工具。此任务不受益于流API

    Pattern intPattern = Pattern.compile("\\d+");
    try(Scanner scanner = new Scanner(Paths.get("foo.txt"));
        BufferedWriter writer = Files.newBufferedWriter(Paths.get("bar.txt"), CREATE_NEW)) {
    
        String s = scanner.findWithinHorizon(intPattern, 0);
        if(s == null) {
            // perform empty action
        } else {
            // perform pre action
            do {
                writer.append(s);
                writer.newLine();
            } while( (s=scanner.findWithinHorizon(intPattern, 0)) != null);
            // perform post action
        }
    }
    

    你仍然可以引入流操作,例如

    Pattern intPattern = Pattern.compile("\\d+");
    try(Scanner scanner = new Scanner(Paths.get("foo.txt"));
        BufferedWriter writer = Files.newBufferedWriter(Paths.get("bar.txt"), CREATE_NEW)) {
    
        String firstLine = scanner.findWithinHorizon(intPattern, 0);
        if(firstLine == null) {
            // perform empty action
        } else {
            // perform pre action
            Stream.concat(Stream.of(firstLine),
                          scanner.findAll(intPattern).map(MatchResult::group))
                .forEach(line -> convertCheckedIOException(() ->  {
                        writer.write(line);
                        writer.newLine();
                    })
                );
            // perform post action
        }
    }
    

    但是必须处理选中的IOException只会使代码复杂化,没有任何好处

  2. # 2 楼答案

    我喜欢有这样一个想法。起初,我认为使用第二个由preaction设置/取消设置的标志并停止该操作可能就足够了。但更复杂的是,每个动作调用之前都会有预动作,而不仅仅是第一个

    我提出了一个同步解决方案,它强制执行preactionspost/empty顺序。需要注意的一点是,第一批并行线程必须等待第一批线程完成,因为它们将运行到^{

    private static <T> void forEach(Stream<T> stream, Consumer<? super T> action, Runnable preAction, Runnable postAction, Runnable ifEmpty)
    {
        AtomicBoolean hasElements = new AtomicBoolean(false);
    
        stream.forEach(new Consumer<T>()
        {
            private Consumer<? super T> delegate = new Consumer<T>()
            {
                private Consumer<? super T> delegate2 = new Consumer<T>()
                {
                    @Override
                    public void accept(T x)
                    {
                        System.out.println("check");
                        hasElements.set(true);
                        preAction.run();
                        action.accept(x);
                        delegate2 = action; // rest of first batch won't run preAction anymore
                        delegate = action; // next batches won't even synchronize anymore
                    }
                };
    
                @Override
                public void accept(T x)
                {
                    synchronized (this)
                    {
                        delegate2.accept(x);
                    }
                }
            };
    
            @Override
            public void accept(T x)
            {
                delegate.accept(x);
            }
        });
    
        if (hasElements.get()) { postAction.run(); } else { ifEmpty.run(); }
    }
    
    public static void main(String[] args)
    {
        Stream<Integer> s = Stream.generate(() -> 1).limit(1000).parallel();
        forEach(s, i -> System.out.println(Thread.currentThread().getId()), () -> System.out.println("pre"),
                () -> System.out.println("post"), () -> System.out.println("empty"));
    }
    
    Output:
    check
    pre
    ...
    many thread IDs
    ...
    post