有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java Intervals调度的RxJava可观测值比指定的时间要长

在下面的代码中,可观察到的应该每300毫秒触发一次。我通过模拟耗时1秒的背景活动来增加它的趣味性。我本来以为,因为我使用的是一个在下面使用线程池的调度程序,所以可以观察到的间隔将每300毫秒在一个新线程上持续触发一次。相反,所观察到的时间间隔每次都会等待整整一秒钟,然后再次触发。这是你想要的行为吗?如果一项任务花费的时间超过了要求的时间,怎么能强迫它并行启动呢

以下是代码:

Observable
            .interval(300, TimeUnit.MILLISECONDS, Schedulers.io())
                .doOnNext(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        System.out.println("action thread: " + Thread.currentThread());
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                })
                .map(new Func1<Long, Float>() {
                    @Override
                    public Float call(Long aLong) {
                        final double result = Math.random();
                        return new Float(result);
                    }
                })
                .takeWhile(new Func1<Float, Boolean>() {
                    @Override
                    public Boolean call(Float aFloat) {
                        return aFloat >= 0.01f;
                    }
                })
            .subscribe(new Action1<Float>() {
                @Override
                public void call(Float aFloat) {
                    System.out.println("observing thread: " + Thread.currentThread());
                    System.out.println(aFloat);
                }
            });

共 (1) 个答案

  1. # 1 楼答案

    观察到的事物本质上是连续的,所以如果你像在例子中那样进入睡眠状态,你就阻塞了整个序列。要进行后台计算,必须通过observeOnsubscribeOn将其移动到另一个线程。在这种情况下,您可以flatMap/concatMapEager在另一个可观察对象中进行睡眠,并将结果合并回主序列:

    Observable.interval(300, TimeUnit.MILLISECONDS)
    .flatMap(t -> Observable.fromCallable(() -> { 
            Thread.sleep(1000);
         }).subscribeOn(Schedulers.io()))
    .map(...)
    .takeWhile(...)
    .subscribe(...)