有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

Java 8中并发异步方法后跟并行执行方法

在花了一天时间学习java并发API之后,我仍然不太明白如何在CompletableFuture和ExecutorService类的帮助下创建以下功能:

当我在REST端点上收到请求时,我需要:

  1. 启动一个异步任务(包括DB查询、筛选等),它将在最后给我一个字符串URL列表
  2. 同时,用HTTP OK回复REST调用方,请求已收到,我正在处理它
  3. 异步任务完成后,我需要将HTTP请求(使用REST调用方提供的负载)发送到我从作业中获得的URL。最多URL的数量大约是100个,所以我需要这些并行发生
  4. 理想情况下,我有一些同步计数器来计算有多少http请求成功/失败,我可以将此信息发送回REST调用方(我需要发送回的URL在请求负载中提供)

我有构建块(方法如:getMatchingObjectsFromDB(callerPayload)、GetURL(ResultToGetMachingObjects)、sendHttpRequest(Url、methodType)等…)我已经为这些写过了,我只是不太明白如何将步骤1和步骤3结合在一起。我将在步骤1中使用CompletableFuture.supplyAsync(),然后我将需要CompletableFuture.thenComponse方法来开始步骤3,但我不清楚如何使用此API实现并行。但是ExecutorService executor = Executors.newWorkStealingPool();非常直观,它根据可用的处理能力创建线程池,并且可以通过invokeAll()方法提交任务

我怎样才能同时使用CompletableFutureExecutorService?或者,如何保证使用CompletableFuture并行执行任务列表?演示代码片段将不胜感激。谢谢


共 (2) 个答案

  1. # 1 楼答案

    您应该使用join()等待所有线程完成

    创建Map<String, Boolean> result以存储请求结果

    在控制器中:

    public void yourControllerMethod() {
    
      CompletableFuture.runAsync(() -> yourServiceMethod());
    }
    

    为您服务:

    // Execute your logic to get List<String> urls
    
    List<CompletableFuture> futures = urls.stream().map(v -> 
     CompletableFuture.supplyAsync(url -> requestUrl(url))
               .thenAcceptAsync(requestResult -> result.put(url, true or false))
    ).collect(toList()); // You have list of completeable future here
    

    然后使用.join()等待所有线程(请记住,您的服务已经在自己的线程中执行)

    CompletableFuture.allOf(futures).join();
    

    然后,您可以通过访问result映射来确定哪一个成功/失败

    编辑

    请张贴您的程序代码,以便其他人也能理解您

    我已经阅读了您的代码,以下是所需的修改:

    When this for loop was not commented out, the receiver webserver got the same request twice, I dont understand the purpose of this for loop.

    对不起,在我之前的回答中,我没有清理它。这只是我脑子里的一个临时想法,最后我忘了删除:D

    只需从代码中删除它

    // allOf() only accepts arrays, so the List needed to be converted /* The code never gets over this part (I know allOf() is a blocking call), even long after when the receiver got the HTTP request

       with the correct payload. I'm not sure yet where exactly the code gets stuck */
    

    您的映射应该是ConcurrentHashMap,因为您稍后将同时修改它

    Map<String, Boolean> result = new ConcurrentHashMap<>();
    

    如果您的代码仍然不能按预期工作,我建议删除parallelStream()部分

    CompletableFutureparallelStream使用公共forkjoin池。我想游泳池已经用完了

    您应该为您的CompletableFuture创建自己的池:

    Executor pool = Executors.newFixedThreadPool(10);
    

    并使用该池执行您的请求:

    CompletableFuture.supplyAsync(YOURTASK, pool).thenAcceptAsync(Yourtask, pool)
    
  2. # 2 楼答案

    为了便于完成,在清理和测试之后,这里是代码的相关部分(感谢Mạ新罕布什尔州奎伊ế唐古伊ễn) :

    Rest控制器类:

    @POST
    @Path("publish")
    public Response publishEvent(PublishEvent eventPublished) {
        /*
            Payload verification, etc.
        */
    
        //First send the event to the right subscribers, then send the resulting hashmap<String url, Boolean subscriberGotTheRequest> back to the publisher
        CompletableFuture.supplyAsync(() -> EventHandlerService.propagateEvent(eventPublished)).thenAccept(map -> {
          if (eventPublished.getDeliveryCompleteUri() != null) {
            String callbackUrl = Utility
                .getUri(eventPublished.getSource().getAddress(), eventPublished.getSource().getPort(), eventPublished.getDeliveryCompleteUri(), isSecure,
                        false);
            try {
              Utility.sendRequest(callbackUrl, "POST", map);
            } catch (RuntimeException e) {
              log.error("Callback after event publishing failed at: " + callbackUrl);
              e.printStackTrace();
            }
          }
        });
    
        //return OK while the event publishing happens in async
        return Response.status(Status.OK).build();
    }
    

    服务类别:

    private static List<EventFilter> getMatchingEventFilters(PublishEvent pe) {
        //query the database, filter the results based on the method argument
    }
    
    private static boolean sendRequest(String url, Event event) {
        //send the HTTP request to the given URL, with the given Event payload, return true if the response is positive (status code starts with 2), false otherwise
    }
    
    static Map<String, Boolean> propagateEvent(PublishEvent eventPublished) {
        // Get the event relevant filters from the DB
        List<EventFilter> filters = getMatchingEventFilters(eventPublished);
        // Create the URLs from the filters
        List<String> urls = new ArrayList<>();
        for (EventFilter filter : filters) {
          String url;
          try {
            boolean isSecure = filter.getConsumer().getAuthenticationInfo() != null;
            url = Utility.getUri(filter.getConsumer().getAddress(), filter.getPort(), filter.getNotifyUri(), isSecure, false);
          } catch (ArrowheadException | NullPointerException e) {
            e.printStackTrace();
            continue;
          }
          urls.add(url);
        }
    
        Map<String, Boolean> result = new ConcurrentHashMap<>();
        Stream<CompletableFuture> stream = urls.stream().map(url -> CompletableFuture.supplyAsync(() -> sendRequest(url, eventPublished.getEvent()))
                                                                                     .thenAcceptAsync(published -> result.put(url, published)));
        CompletableFuture.allOf(stream.toArray(CompletableFuture[]::new)).join();
        log.info("Event published to " + urls.size() + " subscribers.");
        return result;
    }
    

    调试这段代码比通常要困难一些,有时代码会神奇地停止。为了解决这个问题,我只将绝对必要的代码部分放入异步任务中,并确保任务中的代码使用线程安全的东西。而且,一开始我是个哑巴,EventHandlerService.class中的方法使用了synchronized关键字,这导致服务类方法中的CompletableFuture无法执行,因为它默认使用线程池

    A piece of logic marked with synchronized becomes a synchronized block, allowing only one thread to execute at any given time.