有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java为什么是反应堆的Mono。fromCompletionStage比普通的可完成未来慢?

我有一段简单的代码,可以在后台“处理”数据,在每个nth项之后,记录在最后n项上花费的总时间:

class BackgroundWorker implements AutoCloseable {
  private final ExecutorService thread = Executors.newSingleThreadExecutor();
  private final int reportEvery;
  private int processed;
  private LocalTime begin;

  BackgroundWorker(int reportEvery) {
    this.reportEvery = reportEvery;
  }

  CompletableFuture<Boolean> process(int item) {
    var future = new CompletableFuture<Boolean>();
    thread.submit(() ->  {
      try {
        if (processed == 0) {
          begin = LocalTime.now();
        }
        if (++processed == reportEvery) {
          System.out.format("Processed %d items in %dms%n",
              processed, ChronoUnit.MILLIS.between(begin, LocalTime.now()));
          processed = 0;
        }
        future.complete(true);
      } catch (Exception ex) {
        future.complete(false);
      }
    });
    return future;
  }

  @Override
  public void close() {
    thread.shutdownNow();
  }
}

然后我有一个Flux将数据输入BackgroundWorker,计算成功完成的CompletableFuture

Flux<Integer> numbers = Flux.fromStream(IntStream.range(0, 100000).boxed());
try (var worker = new BackgroundWorker(10000)) {
  int successCount = numbers
      .map(worker::process)
      .map(future -> future.thenApply(success -> success ? 1 : 0))
      .reduce(
          CompletableFuture.completedFuture(0),
          (acc, curr) -> acc.thenCombine(curr, Integer::sum))
      .block()
      .join();

  System.out.println("Done; success: " + successCount);
}

和同一段代码,但现在使用Mono.fromCompletionStage

int successCount = numbers
    .map(worker::process)
    .map(Mono::fromCompletionStage)
    .map(mono -> mono.map(success -> success ? 1 : 0))
    .reduce(
        Mono.just(0),
        (acc, curr) -> acc.zipWith(curr, Integer::sum))
    .block()
    .block();

第一个使用futures的应用程序将打印以下内容:

Processed 10000 items in 48ms
Processed 10000 items in 17ms
Processed 10000 items in 10ms
Processed 10000 items in 8ms
Processed 10000 items in 9ms
Processed 10000 items in 5ms
Processed 10000 items in 5ms
Processed 10000 items in 4ms
Processed 10000 items in 3ms
Processed 10000 items in 4ms
Done; success: 100000

但是使用Mono.fromCompletionStage的版本打印:

Processed 10000 items in 138ms
Processed 10000 items in 253ms
Processed 10000 items in 327ms
Processed 10000 items in 477ms
Processed 10000 items in 315ms
Processed 10000 items in 379ms
Processed 10000 items in 448ms
Processed 10000 items in 509ms
Processed 10000 items in 595ms
Processed 10000 items in 668ms
Done; success: 100000

为什么使用Mono而不是CompletableFuture会严重降低性能


共 (1) 个答案

  1. # 1 楼答案

    似乎Mono的压缩占用了最多的时间,并以某种方式影响了执行。可能是因为这样的压缩每次都会创建一个新的MonoZip实例

    但在这一点上,你不必使用减缩和拉链。更惯用的做法是flatMap在monos中,获得一个Flux<Integer>可以在不创建中间垃圾的情况下减少

    此外,由于futures基本上是在创建时开始处理的,所以您可以做一个更简单的concatMap(开销更少,而且此时必须等待每个mono的完成并不重要,因为所有futures都已经在后台运行了):

    Flux<Integer> numbers = Flux.fromStream(IntStream.range(0, 100_000).boxed());
    try (BackgroundWorker worker = new BackgroundWorker(10000)) {
        int successCount = numbers
                .map(worker::process)
                .concatMap(future -> Mono.fromCompletionStage(future))
                .map(success -> success ? 1 : 0)
                .reduce(0, Integer::sum)
                .block();
    
        System.out.println("Done; success: " + successCount);
    }
    

    通过避免从boolean到int的映射,并在reduce中这样做,您甚至可以节省更多的开销:

    .reduce(0, (acc, bool) -> bool ? acc + 1 : acc)