有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

scala Java 8流,首尾相连

Java 8引入了一个类似于Scala的StreamStream类,这是一个强大的惰性构造,使用它可以非常简洁地执行以下操作:

def from(n: Int): Stream[Int] = n #:: from(n+1)

def sieve(s: Stream[Int]): Stream[Int] = {
  s.head #:: sieve(s.tail filter (_ % s.head != 0))
}

val primes = sieve(from(2))

primes takeWhile(_ < 1000) print  // prints all primes less than 1000

我想知道是否有可能在Java 8中实现这一点,所以我写了如下内容:

IntStream from(int n) {
    return IntStream.iterate(n, m -> m + 1);
}

IntStream sieve(IntStream s) {
    int head = s.findFirst().getAsInt();
    return IntStream.concat(IntStream.of(head), sieve(s.skip(1).filter(n -> n % head != 0)));
}

IntStream primes = sieve(from(2));

相当简单,但它会生成java.lang.IllegalStateException: stream has already been operated upon or closed,因为findFirst()skip()都是Stream上的终端操作,只能执行一次

我真的不必两次用完流,因为我只需要流中的第一个数字,其余的作为另一个流,即相当于Scala的Stream.headStream.tail。Java 8Stream中是否有一种方法可以用来实现这一点

谢谢


共 (6) 个答案

  1. # 1 楼答案

    即使没有无法拆分IntStream的问题,代码也无法工作,因为您是递归地调用sieve方法,而不是惰性地调用。因此,在查询结果流中的第一个值之前,需要进行无限递归

    将一个IntStream s分为一个头部和一个尾部IntStream(尚未消耗)是可能的:

    PrimitiveIterator.OfInt it = s.iterator();
    int head = it.nextInt();
    IntStream tail = IntStream.generate(it::next).filter(i -> i % head != 0);
    

    在这个地方,您需要一个在尾部懒洋洋地调用sieve的构造Stream没有规定concat期望现有的流实例作为参数,而您不能用lambda表达式延迟地构造调用sieve的流,因为延迟创建仅适用于lambda表达式不支持的可变状态。如果没有隐藏可变状态的库实现,则必须使用可变对象。但一旦您接受可变状态的要求,解决方案可能比第一种方法更简单:

    IntStream primes = from(2).filter(i -> p.test(i)).peek(i -> p = p.and(v -> v % i != 0));
    
    IntPredicate p = x -> true;
    
    IntStream from(int n)
    {
      return IntStream.iterate(n, m -> m + 1);
    }
    

    这将递归地创建一个过滤器,但最终不管你是创建一个IntPredicate树还是一个IntStream树(就像你的IntStream.concat方法,如果它确实有效的话)。如果不喜欢过滤器的可变实例字段,可以将其隐藏在内部类中(但不能隐藏在lambda表达式中)

  2. # 2 楼答案

    下面的解决方案不进行状态突变,除了流的头/尾解构

    懒散感是通过IntStream获得的。迭代Prime类用于保持生成器的状态

        import java.util.PrimitiveIterator;
        import java.util.stream.IntStream;
        import java.util.stream.Stream;
    
        public class Prime {
            private final IntStream candidates;
            private final int current;
    
            private Prime(int current, IntStream candidates)
            {
                this.current = current;
                this.candidates = candidates;
            }
    
            private Prime next()
            {
                PrimitiveIterator.OfInt it = candidates.filter(n -> n % current != 0).iterator();
    
                int head = it.next();
                IntStream tail = IntStream.generate(it::next);
    
                return new Prime(head, tail);
            }
    
            public static Stream<Integer> stream() {
                IntStream possiblePrimes = IntStream.iterate(3, i -> i + 1);
    
                return Stream.iterate(new Prime(2, possiblePrimes), Prime::next)
                             .map(p -> p.current);
            }
        }
    

    用法如下:

    Stream<Integer> first10Primes = Prime.stream().limit(10)
    
  3. # 3 楼答案

    这里提供了许多有趣的建议,但如果有人需要一个不依赖于第三方库的解决方案,我提出了以下建议:

        import java.util.AbstractMap;
        import java.util.Optional;
        import java.util.Spliterators;
        import java.util.stream.StreamSupport;
    
        /**
         * Splits a stream in the head element and a tail stream.
         * Parallel streams are not supported.
         * 
         * @param stream Stream to split.
         * @param <T> Type of the input stream.
         * @return A map entry where {@link Map.Entry#getKey()} contains an
         *    optional with the first element (head) of the original stream
         *    and {@link Map.Entry#getValue()} the tail of the original stream.
         * @throws IllegalArgumentException for parallel streams.
         */
        public static <T> Map.Entry<Optional<T>, Stream<T>> headAndTail(final Stream<T> stream) {
            if (stream.isParallel()) {
                throw new IllegalArgumentException("parallel streams are not supported");
            }
            final Iterator<T> iterator = stream.iterator();
            return new AbstractMap.SimpleImmutableEntry<>(
                    iterator.hasNext() ? Optional.of(iterator.next()) : Optional.empty(),
                    StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false)
            );
        }
    
  4. # 4 楼答案

    基本上可以这样实现:

    static <T> Tuple2<Optional<T>, Seq<T>> splitAtHead(Stream<T> stream) {
        Iterator<T> it = stream.iterator();
        return tuple(it.hasNext() ? Optional.of(it.next()) : Optional.empty(), seq(it));
    }
    

    在上面的例子中,Tuple2Seq是从jOOλ借来的类型,这是我们为jOOQ集成测试开发的库。如果不需要任何其他依赖项,不妨自己实现它们:

    class Tuple2<T1, T2> {
        final T1 v1;
        final T2 v2;
    
        Tuple2(T1 v1, T2 v2) {
            this.v1 = v1;
            this.v2 = v2;
        }
    
        static <T1, T2> Tuple2<T1, T2> tuple(T1 v1, T2 v2) {
            return new Tuple<>(v1, v2);
        }
    }
    
    static <T> Tuple2<Optional<T>, Stream<T>> splitAtHead(Stream<T> stream) {
        Iterator<T> it = stream.iterator();
        return tuple(
            it.hasNext() ? Optional.of(it.next()) : Optional.empty,
            StreamSupport.stream(Spliterators.spliteratorUnknownSize(
                it, Spliterator.ORDERED
            ), false)
        );
    }
    
  5. # 5 楼答案

    如果您不介意使用第三方库cyclops-streams,我写的库有很多潜在的解决方案

    StreamUtils类有大量静态方法,用于直接与java.util.stream.Streams一起工作,包括headAndTail

    HeadAndTail<Integer> headAndTail = StreamUtils.headAndTail(Stream.of(1,2,3,4));
    int head = headAndTail.head(); //1
    Stream<Integer> tail = headAndTail.tail(); //Stream[2,3,4]
    

    Streamable类表示一个可重放的Stream,它通过构建一个懒惰的、高速缓存的中间数据结构来工作。因为它是缓存和可偿还的——头和尾可以直接单独实现

    Streamable<Integer> replayable=  Streamable.fromStream(Stream.of(1,2,3,4));
    int head = repayable.head(); //1
    Stream<Integer> tail = replayable.tail(); //Stream[2,3,4]
    

    cyclops-streams还提供了一个顺序的Stream扩展,该扩展反过来扩展jOOλ,并具有基于Tuple的(来自jOOλ)和域对象(HeadAndTail)的头尾提取解决方案

    SequenceM.of(1,2,3,4)
             .splitAtHead(); //Tuple[1,SequenceM[2,3,4]
    
    SequenceM.of(1,2,3,4)
             .headAndTail();
    

    根据Tagir的请求进行更新->;使用SequenceM的Scala筛的Java版本

    public void sieveTest(){
        sieve(SequenceM.range(2, 1_000)).forEach(System.out::println);
    }
    
    SequenceM<Integer> sieve(SequenceM<Integer> s){
    
        return s.headAndTailOptional().map(ht ->SequenceM.of(ht.head())
                                .appendStream(sieve(ht.tail().filter(n -> n % ht.head() != 0))))
                        .orElse(SequenceM.of());
    }
    

    另一个版本是通过Streamable

    public void sieveTest2(){
        sieve(Streamable.range(2, 1_000)).forEach(System.out::println);
    }
    
    Streamable<Integer> sieve(Streamable<Integer> s){
    
        return s.size()==0? Streamable.of() : Streamable.of(s.head())
                                                        .appendStreamable(sieve(s.tail()
                                                                        .filter(n -> n % s.head() != 0)));
    }
    

    注意SequenceM中的Streamable都没有空的实现——因此对Streamable进行大小检查并使用headAndTailOptional

    最后是一个使用普通java.util.stream.Stream

    import static com.aol.cyclops.streams.StreamUtils.headAndTailOptional;
    
    public void sieveTest(){
        sieve(IntStream.range(2, 1_000).boxed()).forEach(System.out::println);
    }
    
    Stream<Integer> sieve(Stream<Integer> s){
    
        return headAndTailOptional(s).map(ht ->Stream.concat(Stream.of(ht.head())
                                ,sieve(ht.tail().filter(n -> n % ht.head() != 0))))
                        .orElse(Stream.of());
    }
    

    另一个更新——基于@Holger版本的惰性迭代,使用对象而不是原语(注意,原语版本也是可能的)

      final Mutable<Predicate<Integer>> predicate = Mutable.of(x->true);
      SequenceM.iterate(2, n->n+1)
               .filter(i->predicate.get().test(i))
               .peek(i->predicate.mutate(p-> p.and(v -> v%i!=0)))
               .limit(100000)
               .forEach(System.out::println);
    
  6. # 6 楼答案

    我的StreamEx库现在有^{}操作,它解决了这个问题:

    public static StreamEx<Integer> sieve(StreamEx<Integer> input) {
        return input.headTail((head, tail) -> 
            sieve(tail.filter(n -> n % head != 0)).prepend(head));
    }
    

    headTail方法采用BiFunction,在流终端操作执行期间最多执行一次。所以这个实现是懒惰的:它在遍历开始之前不计算任何东西,只计算请求的素数。BiFunction接收第一个流元素head和其余元素的流tail,并且可以以它想要的任何方式修改tail。您可以将其与预定义的输入一起使用:

    sieve(IntStreamEx.range(2, 1000).boxed()).forEach(System.out::println);
    

    但无限流也能起作用

    sieve(StreamEx.iterate(2, x -> x+1)).takeWhile(x -> x < 1000)
         .forEach(System.out::println);
    // Not the primes till 1000, but 1000 first primes
    sieve(StreamEx.iterate(2, x -> x+1)).limit(1000).forEach(System.out::println);
    

    还有另一种解决方案,使用headTail和谓词连接:

    public static StreamEx<Integer> sieve(StreamEx<Integer> input, IntPredicate isPrime) {
        return input.headTail((head, tail) -> isPrime.test(head) 
                ? sieve(tail, isPrime.and(n -> n % head != 0)).prepend(head)
                : sieve(tail, isPrime));
    }
    
    sieve(StreamEx.iterate(2, x -> x+1), i -> true).limit(1000).forEach(System.out::println);
    

    比较递归解很有趣:它们能生成多少素数

    @John McClean解决方案(StreamUtils

    约翰·麦克莱恩(John McClean)的解决方案并不懒惰:你不能用无限的流量来满足他们。所以我只是通过反复试验找到了最大允许上限(17793)(在发生StackOverflower错误之后):

    public void sieveTest(){
        sieve(IntStream.range(2, 17793).boxed()).forEach(System.out::println);
    }
    

    @John McClean解决方案(Streamable

    public void sieveTest2(){
        sieve(Streamable.range(2, 39990)).forEach(System.out::println);
    }
    

    将上限增加到39990以上会导致堆栈溢出错误

    @frhack解决方案(LazySeq

    LazySeq<Integer> ints = integers(2);
    LazySeq primes = sieve(ints); // sieve method from @frhack answer
    primes.forEach(p -> System.out.println(p));
    

    结果:在质数=53327之后卡住,巨大的堆分配和垃圾收集占90%以上。从53323上升到53327需要几分钟的时间,因此等待更多时间似乎不切实际

    @vidi解决方案

    Prime.stream().forEach(System.out::println);
    

    结果:质数后的StackOverflowerr=134417

    我的解决方案(StreamEx)

    sieve(StreamEx.iterate(2, x -> x+1)).forEach(System.out::println);
    

    结果:质数后的StackOverflowerr=236167

    @frhack解决方案(rxjava

    Observable<Integer> primes = Observable.from(()->primesStream.iterator());
    primes.forEach((x) -> System.out.println(x.toString()));            
    

    结果:质数后的StackOverflowerr=367663

    @Holger解决方案

    IntStream primes=from(2).filter(i->p.test(i)).peek(i->p=p.and(v->v%i!=0));
    primes.forEach(System.out::println);
    

    结果:质数后的StackOverflowerr=368089

    我的解决方案(带谓词连接的StreamEx)

    sieve(StreamEx.iterate(2, x -> x+1), i -> true).forEach(System.out::println);
    

    结果:质数后的StackOverflowerr=368287


    因此,三个涉及谓词连接的解决方案获胜,因为每个新条件只会再添加两个堆栈帧。我认为,两者之间的差异是微不足道的,不应被视为定义一个赢家。不过,我更喜欢我的第一个StreamEx解决方案,因为它更类似于Scala代码