有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

项目反应器(或RxJava2)的java执行器序列调用

我有以下任务,我想用ProjectReactor(或RxJava)解决它

这是事件的根源。每个事件由serviceId和一些有效负载组成。一旦收到事件,我们需要对指定的serviceId和负载执行操作。但我们应该确保对同一serviceId的两个请求之间的时间间隔必须大于或等于1秒。但对差异服务的请求可以并行执行

我们还应该注意,服务的计数是动态的

如下图所示

enter image description here

目前我有以下代码:

Flux.create((sink-> eventProvider.listen(new EventListner(){
                public void event(req) {
                    sink.next(req);
                }
            })))
        /* need some logic here */
        .flatMap(req -> requestExecutor.execute(req))
        .doOnNext(res -> responseProcessor.process(res))
        .subscribe();

你有什么想法吗


共 (2) 个答案

  1. # 1 楼答案

    Flux.groupBy()将在这种情况下帮助您。 操作员使用映射器函数创建关键点,并根据关键点将发射的元素组合在一起。可以将Service ID视为一个键。

    Flux.create((sink-> eventProvider.listen(new EventListner(){
                public void event(req) {
                    sink.next(req);
                }
            })))
        .groupBy(req -> req.getServiceId()) //group req by serviceId
        .flatMap(reqGroup-> reqGroup..delayElements(Duration.ofSeconds(1)) //add minimum delay to the group
        .flatMap(req -> requestExecutor.execute(req))
        .doOnNext(res -> responseProcessor.process(res))
        .subscribe();
    

    您还可以根据serviceId添加不同的延迟。以下面的代码片段为例——偶数整数将延迟2秒,奇数将延迟1秒

        Flux.range(1, 20)
        .groupBy(integer -> integer % 2)
        .flatMap(integerGroupedFlux -> {
          Flux<Integer> integerFlux;
          if (integerGroupedFlux.key() == 0) { //even integers
            integerFlux = integerGroupedFlux.delayElements(Duration.ofSeconds(2));
          } else {
            integerFlux = integerGroupedFlux.delayElements(Duration.ofSeconds(1));
          }
          return integerFlux;
        })
        .subscribe(System.out::println);
    
  2. # 2 楼答案

    如果事件标识了它们发起调用的服务,那么可以使用groupBy()操作符按服务分离流。要在每个服务请求之后引入延迟,请使用flatMap()和一个参数来单线程使用

    在RxJava中:

    observable
      .groupBy(event -> getServiceId( event )) // 1
      .flatMap(serviceObservable -> // 2
           serviceObservable // 3
             .flatMap( ev -> service(serviceObservable.getKey(), ev), 1) // 4
                               .delay(1, TimeUnit.SECONDS)) // 5
      .subscribe();
    
    1. 按他们将使用的服务对事件进行分组。该ID稍后将作为密钥存储在服务器上。当遇到新服务ID时,这将发出新项
    2. serviceObservable是一个GroupByObservable将在下面处理
    3. 从这个可观测到的每一次发射都是一个事件,应该是一次服务
    4. serviceObservable.getKey()返回要使用的服务的ID。我发明了一种方法service(),它通过服务的ID向服务发送事件。此外,参数1告诉flatMap()单线程执行该操作,因此一次只能发生一个服务请求
    5. delay()(或您想要的任何操作符)将等待一秒钟,然后释放操作

    (免责声明:此代码未经测试,但我在过去的项目中也做过类似的调度,所以基本思想是正确的。)