有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

热观测的java RxJava延迟

我看到了这个问题。 这是关于实现每个发射项目的延迟。这是如何基于accepted answer实现的:

Observable.zip(Observable.range(1, 5)
    .groupBy(n -> n % 5)
    .flatMap(g -> g.toList()),
Observable.interval(50, TimeUnit.MILLISECONDS),
(obs, timer) -> obs)
.doOnNext(item -> {
  System.out.println(System.currentTimeMillis() - timeNow);
  System.out.println(item);
  System.out.println(" ");
}).toList().toBlocking().first();

在这个问题中,提问者特别要求一组固定的观察值(Observable.range(1,5)),不幸的是,这不是我想要实现的

我也看到了这个comment

这正是我想要实现的。因此,我的source observable以比间隔更慢(有时更快)的速度发射项目。此外,可观测到的辐射是永无止境的

===

所以基本上我希望热观测的延迟最小

例如,如果我想要400毫秒的最小延迟,并且我有这种可观测的发射度:

X1-100ms delay-X2-200ms delay-X3-600ms delay-X4-20000ms delay-X5-...

我想让它屈服:

X1-400ms delay-X2-400ms delay-X3-600ms delay-X4-20000ms delay-X5-...

有人有办法做到这一点吗


共 (1) 个答案

  1. # 1 楼答案

    你的要求太奇怪了

    我能解决,但不优雅。这是我的代码:

    class Three<A, B, C> {
        A a;
        B b;
        C c;
        // Getter, Setter, Constructor
      }
    
      public static void main(String[] args) throws Exception {
        BehaviorSubject<Integer> s = BehaviorSubject.create();
        // Three = (The value, upstream comes mills, downstream emits mills)
        s.map(i -> new Three<>(i, System.currentTimeMillis(), System.currentTimeMillis()))
            .scan((a, b) -> {
              b.setC(a.getC() + Math.max(400L, b.getB() - a.getB()));
              return b;
            })
            .concatMap(i -> Observable.just(i.getA()).delay(Math.max(0, i.getC() - System.currentTimeMillis()),
                TimeUnit.MILLISECONDS))
            .subscribe(i -> System.out.println(i + "\t" + System.currentTimeMillis()));
        s.onNext(0);
        Thread.sleep(100);
        s.onNext(1);
        Thread.sleep(200);
        s.onNext(2);
        Thread.sleep(600);
        s.onNext(3);
        Thread.sleep(2000);
        s.onNext(4);
        Thread.sleep(200);
        s.onNext(5);
        Thread.sleep(800);
        s.onNext(6);
        Thread.sleep(1000);
      }
    

    和输出

    0   1510128693984
    1   1510128694366 // 400ms
    2   1510128694766 // 400ms
    3   1510128695366 // 600ms
    4   1510128697366 // 2000ms
    5   1510128697766 // 400ms
    6   1510128698567 // 800ms