有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java同步多个异步请求

给定一个异步服务,该服务应在完成时关闭。我想执行一个请求的多个实例。所有请求完成后,我想关闭服务。我想知道实现这一目标的最佳方式是什么。下面的代码演示了问题,但没有实际关闭服务:

    class Service implements Closeable {
        public Service() {/*...*/}

        public ListenableFuture<Integer> processRequest(Integer param) {/*...*/}

        @Override
        public void close() {/*...*/}
    }

    public void proccessRequests(ArrayList<Integer> params) {
        Service svc = new Service();
        for (Integer param : params) {
            final ListenableFuture<Integer> res = svc.processRequest(param);
        }
    }

我正在考虑关闭该服务的不同选项:

  • 以这种方式使用CountDownLatch

    public void processRequests(ArrayList<Integer> params) {
        Service svc = new Service();
        CountDownLatch latch = new CountDownLatch(params.size());
        for (Integer param : params) {
            final ListenableFuture<Integer> res = svc.processRequest(param);
            Futures.addCallback(res, new FutureCallback<Integer>() {
                @Override
                public void onSuccess(Integer integer) {
                    latch.countDown();
                    if (latch.getCount() == 0) {
                       svc.close();
                    }
                }
    
                @Override
                public void onFailure(Throwable throwable) {
                    latch.countDown();
                    if (latch.getCount() == 0) {
                       svc.close();
                    }
                }
            });
        }
    }
    
  • 以这种方式使用CountDownLatch

    public void processRequests(ArrayList<Integer> params) {
        Service svc = new Service();
        CountDownLatch latch = new CountDownLatch(params.size());
        for (Integer param : params) {
            final ListenableFuture<Integer> res = svc.processRequest(param);
            Futures.addCallback(res, new FutureCallback<Integer>() {
                @Override
                public void onSuccess(Integer integer) {
                    latch.countDown();
                }
    
                @Override
                public void onFailure(Throwable throwable) {
                    latch.countDown();
                }
            });
        }
        latch.await();
        svc.close();
    }
    
  • 与第一个选项类似,但使用AtomicInteger

实现这一目标的最佳方式是什么?第一,第二,第三,这些都没有


共 (1) 个答案

  1. # 1 楼答案

    使用CountDownLatch的第一个解决方案看起来不错,但还有其他一些方法

    从版本20.0开始Futures类有一个whenAllComplete方法,就是专门为此设计的。用它你可以写:

    Service svc = new Service();
    Futures.whenAllComplete(params.stream().map(svc::processRequest).collect(Collectors.toList())).call(() -> {
        svc.close();
        return null;
    }, ForkJoinPool.commonPool());
    

    您还可以使用Java 8 CompletableFuture类,该类具有类似的方法allOf

    CompletableFuture.allOf(params.stream().map(svc::processRequest).toArray(CompletableFuture[]::new))
        .thenAccept(v -> svc.close());
    

    但是在这种情况下,您必须使Service返回一个CompletableFuture