java理解RxJava在底层数据源具有新值时是可观察的
我正在尝试RxJava可观察和观察程序代码。我的目标是检查底层源接收到新数据值时的工作方式。我的代码如下:
List<Integer> numbers = new ArrayList<>();
Runnable r = new Runnable() {
@Override
public void run() {
int i = 100;
while(i < 110) {
numbers.add(i);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
}
}
};
numbers.add(0);
numbers.add(1);
numbers.add(2);
Observable.fromIterable(numbers)
.observeOn(Schedulers.io())
.subscribe(i -> System.out.println("Received "+i+ " on "+ Thread.currentThread().getName()),
e -> e.printStackTrace());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Thread t = new Thread(r);
t.start();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
所以我有一个数字列表。然后,我有一个runnable,它在列表中添加新的数字,添加之间有一个时间间隔。我还没开始写呢。我将0,1,2添加到列表中,然后使用它创建一个可观察对象,在池中的线程上调度该观察者,最后订阅该可观察对象。当订阅发生时,observable发出值0,1,2,并调用observer(执行传递给订阅的lambda)。然后在主线程上引入1秒的延迟,然后使用前面创建的runnable生成一个新线程,并添加最后的延迟,这样应用程序就不会立即退出
我所期望的是,当新的数字添加到列表中时,必须调用observer,从而打印消息。但事实并非如此。我的理解肯定错了。我是否还需要在调度器上放置observable
# 1 楼答案
^{} 方法是每次生成订阅时可观察值的“一次性”加载。构建订阅之后发生的事情不再有影响。当使用带有} 方法时,您将看到订阅已被完全使用,并且三个初始值已被打印
onComplete
参数的^{您可以使用^{} (类似于^{} )方法,在
onNext()
方法中添加“新值”,而之前生成的订阅仍然处于活动状态(且未完成)。这样,您可以首先构建订阅,并继续调用onNext()
以获取主题中的新值,直到完成并调用onCompleted()