有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java CompletableFuture:转换与组合

请考虑下面的例子:《现代java在行动》一书(第二版,清单16.16,第405页)< EEM >。在那里,我们有三个map操作,从一个流中的所有商店获取产品的折扣价格列表。首先,我们联系每个商店以获取包含非折扣价格和折扣类型的响应,然后将响应解析为Quote对象,并将其传递给远程折扣服务,该服务返回一个包含已折扣价格的字符串

public List<String> findPrices(String product) {

    List<CompletableFuture<String>> priceFutures =
        shops.stream()
            .map(shop -> CompletableFuture.supplyAsync(
                    () -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenCompose(quote ->
                    CompletableFuture.supplyAsync(
                        () -> Discount.applyDiscount(quote), executor)))
            .collect(toList());

    return priceFutures.stream()
            .map(CompletableFuture::join)
            .collect(toList());
}

我的问题不是关于thenApplythenCompose之间的区别。我相信,后者用于避免像CompletableFuture<CompletableFuture<...>>这样的嵌套结构。但我不明白的是,为什么我们需要在这里创建另一个级别的CompletableFuture?似乎作者通过创建并展平嵌套的CompletableFuture,而不是简单地在第三个映射中使用thenApplyAsync,为代码增加了一些人为的复杂性:

            .map(shop -> CompletableFuture.supplyAsync(
                    () -> shop.getPrice(product), executor))
            .map(future -> future.thenApply(Quote::parse))
            .map(future -> future.thenApplyAsync(Discount::applyDiscount, executor))

这两种映射用法(原始的带有thenCompose和带有thenApplyAsync)等效吗?两者都接受上一个映射的结果作为参数,都提供自定义执行器来执行任务,并且都返回相同的CompletableFuture<String>结果


共 (1) 个答案

  1. # 1 楼答案

    是的,thenComposesupplyAsync实现了与直接使用thenApplyAsync相同的效果

    我没有读过这本书,但可能是一些示例代码关注于某个主题或特性,而不是最简洁或最快的代码。因此,假设您正在考虑使用类似的代码,我会给出一些建议


    关于这段代码的另一个建议是,通过对map的连续调用来链接每个CompletableFuture有点奇怪。当前的示例似乎是在以前基于Stream的方法的基础上构建的,该方法具有多个调用,并且保持原样,但使用了CompletableFuture

    我更喜欢单个map并直接链接每个CompletableFuture,这也允许将其重构为自己的方法

    因此:

                .map(shop -> CompletableFuture.supplyAsync(
                        () -> shop.getPrice(product), executor))
                .map(future -> future.thenApply(Quote::parse))
                .map(future -> future.thenApplyAsync(Discount::applyDiscount, executor))
    

    将变成这样:

                .map(shop ->
                    CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor)
                    .thenApply(Quote::parse)
                    .thenApplyAsync(Discount::applyDiscount, executor))
    

    这个lambda很容易变成一个方法,在没有Stream的情况下可以重用,它可以与另一个CompletableFuture组合,可以测试,可以模拟,等等


    另一个建议是使您的代码始终异步,这样findPrices就不会阻塞join(或者get

    阻塞的问题是它可能阻塞执行器上最后一个可用的线程,从而通过线程耗尽引发死锁。最终需要在执行器上运行的代码所依赖的异步代码可能永远不会运行

    public CompletableFuture<List<String>> findPricesAsync(String product) {
        // List<CompletableFuture<String>> priceFutures = ...
    
        CompletableFuture<Void> all = CompletableFuture.allOf(priceFutures.toArray(new CompletableFuture<String>[priceFutures.size()]));
        return all.thenRun(() -> priceFutures.stream()
            .map(CompletableFuture::join));
    }
    

    注意,返回类型从List<String>更改为CompletableFuture<List<String>>。还要注意,对join的最后一次调用不会阻塞,因为调用它的每个CompletableFuture都已完成


    最后,我倾向于返回CompletionStage,因为它允许除CompletableFuture之外的假设实现。我还假设返回的对象也实现了Future,这允许对结果使用get,但不使用join,区别在于声明的抛出异常类型

    在一个例子中,我让类NIO的方法为异步I/O返回CompletionStage,我实现了一个CompletableFuture的子类,覆盖每个*Async方法中使用的默认执行器,该方法没有执行器参数。自Java9以来,这变得更容易了,仍然是通过子类化实现的,但它只需要重写defaultExecutor。我将其子类化的主要原因是,使用组合的替代方法将导致更多的代码(包装结果等等)。另一个原因,但不是真正让我担心的,是每个实例都有一个额外的对象要被垃圾收集

    这只是为了证明,在某些情况下,可能确实需要定制CompletionStage实现,这些实现可能是CompletableFuture的子类,也可能不是