有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java笛卡尔积inRxJava

现在我持有一个Observable<Observable<Integer>,如何将它转换成包含n元笛卡尔积的Observable<int[]>

例如:

Observable<Observable<Integer> ob = Observable.just(
  Observable.just(0,1),
  Observable.just(2,3),
  Observable.just(4,5)
  );
ob...... ->   (0,2,4), (0,3,4), (0,2,5), (0,3,5), (1,2,4), (1,3,4), (1,2,5), (1,3,5)

共 (3) 个答案

  1. # 1 楼答案

    我可以自己解决。但还有更优雅的方式吗
    (用toArray方法将Observable<T>转换为T[]

        Observable<int[]> toObservableArray(Observable<Observable<Integer>> obs) {
            List<int[]> list = obs.map(ob -> toArray(ob)).toList().toBlocking().last();
            return Observable.create(new SyncOnSubscribe<int[], int[]>() {
                @Override
                protected int[] generateState() {
                    int[] array = new int[list.size()];
                    Arrays.fill(array, 0);
                    return array;
                }
    
                @Override
                protected int[] next(int[] state, Observer<? super int[]> observer) {
                    int[] next = new int[list.size()];
                    for (int i = 0; i < next.length; i++) {
                        next[i] = list.get(i)[state[i]];
                    }
                    observer.onNext(next);
                    state[state.length - 1]++;
                    for (int i = state.length - 1; i >= 0; i ) {
                        int delta = list.get(i).length - state[i];
                        if (delta > 0) {
                            break;
                        } else if (delta == 0) {
                            state[i] = 0;
                            if (i == 0) {
                                observer.onCompleted();
                                break;
                            }
                            state[i - 1]++;
                        }
                    }
                    return state;
                }
            });
        }
    
  2. # 2 楼答案

    以异步方式创建笛卡尔积是困难的,或者在某种意义上是不可能的。如果阻塞是可以的,你可以这样做

    public class Main
    {
    
        static class ProductIterator<T> implements Iterator<T[]>
        {
            private final List<List<T>> componentsList;
            private final Class<T> componentClass;
            private final int[] indices;
            private boolean hasNext;
    
            public ProductIterator(List<List<T>> componentsList, Class<T> componentClass)
            {
                this.componentsList = componentsList;
                this.componentClass = componentClass;
                this.indices = new int[componentsList.size()];
                this.hasNext = this.indices[componentsList.size() - 1] < componentsList.get(componentsList.size() - 1).size();
            }
    
            @Override
            public boolean hasNext()
            {
                return hasNext;
            }
    
            @Override
            public T[] next()
            {
                T[] res = (T[]) Array.newInstance(componentClass, componentsList.size());
                for (int i = 0; i < componentsList.size(); i++)
                {
                    res[i] = componentsList.get(i).get(indices[i]);
                }
    
                // move next
                indices[0]++;
                for (int i = 0; i < componentsList.size() - 1; i++)
                {
                    if (indices[i] == componentsList.get(i).size())
                    {
                        indices[i] = 0;
                        indices[i + 1]++;
                    }
                }
                hasNext = indices[componentsList.size() - 1] < componentsList.get(componentsList.size() - 1).size();
    
                return res;
            }
        }
    
        public static <T> Observable<T[]> product(Observable<Observable<T>> components, Class<T> componentClass)
        {
            return Observable.fromIterable(new Iterable<T[]>()
            {
                @Override
                public Iterator<T[]> iterator()
                {
                    // postpone blocking up until iterator is requested 
                    // and by this point we can't postpone anymore 
                    Single<List<List<T>>> componentsList = components.map(o -> o.toList().blockingGet()).toList();
                    return new ProductIterator<T>(componentsList.blockingGet(), componentClass);
                }
            });
        }
    
        public static void main(String[] args) throws Exception
        {
            Observable<Observable<Integer>> ob = Observable.just(
                    Observable.just(0, 1),
                    Observable.just(2, 3),
                    Observable.just(4, 5)
            );
    
            Observable<Integer[]> product = product(ob, Integer.class);
            product.forEach(a -> System.out.println(Arrays.toString(a)));
        }
    }
    

    可以改进此代码以避免阻塞,但仍然需要缓存来自所有Observable的所有结果,代码将更加复杂。最有可能的是,阻塞对你来说是不可接受的,无论如何,试图得到笛卡尔积是个坏主意

  3. # 3 楼答案

    首先,您需要固定数量的输入Observable。其次,不需要阻塞,但可能需要缓存,因为第二、第三等Observable需要多次使用

    import java.util.*;
    
    import io.reactivex.Observable;
    
    public class Cartesian {
    
        static Observable<int[]> cartesian(Observable<Observable<Integer>> sources) {
            return sources.toList().flatMapObservable(list -> cartesian(list));
        }
    
        static Observable<int[]> cartesian(List<Observable<Integer>> sources) {
            if (sources.size() == 0) {
                return Observable.<int[]>empty();
            }
            Observable<int[]> main = sources.get(0).map(v -> new int[] { v });
    
            for (int i = 1; i < sources.size(); i++) {
                int j = i;
                Observable<Integer> o = sources.get(i).cache();
                main = main.flatMap(v -> {
                    return o.map(w -> {
                        int[] arr = Arrays.copyOf(v, j + 1);
                        arr[j] = w;
                        return arr;
                    });
                });
            }
    
            return main;
        }
    
        public static void main(String[] args) {
            cartesian(Observable.just(
                Observable.just(0, 1), 
                Observable.just(2, 3), 
                Observable.just(4, 5)
            ))
            .subscribe(v -> System.out.println(Arrays.toString(v)));
        }
    }