有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

java使用AsyncRestTemplate多次生成API,并等待所有操作完成

我必须使用RestTemplate多次使用不同的参数进行RESTAPI调用。API是相同的,但它是正在更改的参数。次数也是可变的。我想使用AsyncRestTemplate,但我的主线程应该等到所有API调用成功完成。我还想处理每个API调用返回的响应。目前我正在使用RestTemplate。基本形式如下

List<String> listOfResponses = new ArrayList<String>();
for (Integer studentId : studentIdsList) {
    String respBody;
    try {
        ResponseEntity<String> responseEntity = restTemplate.exchange(url, method, requestEntity, String.class);
    } catch (Exception ex) {
        throw new ApplicationException("Exception while making Rest call.", ex);
    }
    respBody = requestEntity.getBody();
    listOfResponses.add(respBody);          
}

在这种情况下,如何实现AsyncRestTemplate


共 (3) 个答案

  1. # 1 楼答案

    如果对您来说可行,您可以使用Java 8流API:

    List<String> listOfResponses = studentIdsList.stream()
        .parrallel()
        .map({studentId ->
            ResponseEntity<String> responseEntity = restTemplate.exchange(url, method, studentId, String.class);
            return responseEntity.getBody();
        })
        .collect(Collectors.toList());
    

    此代码基本上将执行两件事:

    1. 并行执行请求
    2. 将请求的结果收集到列表中

    更新:同意@Didier L-当您需要执行大量请求时,此解决方案可能无法正常工作。以下是更新版本:

    List<String> listOfResponses  = studentIdsList.stream()
                    .map(studentId -> asyncRestTemplate.exchange(url, method, studentId, String.class)
                    .collect(Collectors.toList()).stream()
                    .map(this::retrieveResult)
                    .collect(Collectors.toList());
    
        /**
         * Retrieves results of each request by blocking the main thread. Note that the actual request was performed on the previous step when
         * calling asyncRestTemplate.exchange(url, method, studentId, String.class)
         */
        private String retrieveResult(ListenableFuture<ResponseEntity<String>> listenableFuture) {
            try {
                return listenableFuture.get().getBody();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
  2. # 2 楼答案

    使用AsyncRestTemplate(实际上是任何异步API)的主要思想是第一次发送所有请求,保留相应的未来,然后在第二次处理所有响应。只需使用2个循环即可完成此操作:

    List<ListenableFuture<ResponseEntity<String>>> responseFutures = new ArrayList<>();
    for (Integer studentId : studentIdsList) {
        // FIXME studentId is not used
        ListenableFuture<ResponseEntity<String>> responseEntityFuture = restTemplate.exchange(url, method, requestEntity, String.class);
        responseFutures.add(responseEntityFuture);
    }
    // now all requests were send, so we can process the responses
    List<String> listOfResponses = new ArrayList<>();
    for (ListenableFuture<ResponseEntity<String>> future: responseFutures) {
        try {
            String respBody = future.get().getBody();
            listOfResponses.add(respBody);
        } catch (Exception ex) {
            throw new ApplicationException("Exception while making Rest call.", ex);
        }
    }
    

    注意:如果需要将响应与原始请求配对,可以使用映射或请求+响应对象列表替换未来列表

    我还注意到你的问题中没有使用studentId

  3. # 3 楼答案

    这里是我想建议的另一个解决方案,它使用Spring的RestTemplate而不是AsyncRestTemplate。它还使用Java8 CompletableFuture

    public void sendRequestsAsync(List<Integer> studentList) {
        List<CompletableFuture<Void>> completableFutures = new ArrayList<>(studentList.size()); //List to hold all the completable futures
        List<String> responses = new ArrayList<>(); //List for responses
        ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    
        for (Integer studentId : studentList) { //Iterate student list
            CompletableFuture<Void> requestCompletableFuture = CompletableFuture
                    .supplyAsync(
                            () -> restTemplate.exchange("URL/" + studentId, HttpMethod.GET, null, String.class),
                            yourOwnExecutor
                    )//Supply the task you wanna run, in your case http request
                    .thenApply((responseEntity) -> {
                        responses.add(responseEntity.getBody());
                        return responseEntity;
                    })//now you can add response body to responses
                    .thenAccept((responseEntity) -> {
                        doSomeFinalStuffWithResponse(responseEntity);
                    })//here you can do more stuff with responseEntity (if you need to)
                    .exceptionally(ex -> {
                        System.out.println(ex);
                        return null;
                    });//do something here if an exception occurs in the execution;
    
            completableFutures.add(requestCompletableFuture);
        }
    
        try {
            CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).get(); //Now block till all of them are executed by building another completablefuture with others.
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
    

    我更喜欢这个解决方案,因为我可以链接我想要的任意多的业务逻辑,而不必依赖Spring内部的异步发送。 显然,您可以进一步清理代码,我现在还没有太多的注意到这一点