有 Java 编程相关的问题?

你可以在下面搜索框中键入要查询的问题!

基于CompletableFuture的java任务调度

我正在通过CompletableFuture API学习并发性。假设我有两个任务:一个需要250毫秒,另一个需要2500毫秒。在以下代码中:

        Supplier<List<Long>> supplyIds = () -> {
            sleep(200);
            return(Arrays.asList(1L, 2L, 3L));
        };

        Function<List<Long>, CompletableFuture<List<User>>> fetchUsers1 = idList -> {
            sleep(250);
            System.out.println("User2"+ Thread.currentThread().getName());
            Supplier<List<User>> userSupplier = () ->  idList.stream().map(User::new).collect(Collectors.toList());
            return(CompletableFuture.supplyAsync(userSupplier));
        };
        Function<List<Long>, CompletableFuture<List<User>>> fetchUsers2 = idList -> {
            sleep(2500);
            System.out.println("User2"+ Thread.currentThread().getName());
            Supplier<List<User>> userSupplier = () -> idList.stream().map(User::new).collect(Collectors.toList());
            return(CompletableFuture.supplyAsync(userSupplier));
        };
        Consumer<List<User>> displayer = users -> {
            users.forEach(System.out::println);
        };

        CompletableFuture<List<Long>> completableFuture = CompletableFuture.supplyAsync(supplyIds);
        CompletableFuture<List<User>> users1 = completableFuture.thenCompose(fetchUsers1);
        CompletableFuture<List<User>> users2 = completableFuture.thenCompose(fetchUsers2);

        users1.thenRun(()-> System.out.println("User 1"));
        users2.thenRun(()-> System.out.println("User 2"));
        users1.acceptEither(users2, displayer);

        sleep(6000);

我得到以下结果:

User2ForkJoinPool.commonPool-worker-1
User 2
1
2
3
User2ForkJoinPool.commonPool-worker-1
User 1

我知道代码是同步运行的,因为使用的是同一个公共fork-join-pool线程,我们没有指定线程。我不明白为什么要先执行fetchUsers2任务,然后再执行fetchUsers1任务(这似乎与每次运行一致)。我假设由于thenCompose在代码中首先被fetchUsers1调用,所以它将首先被“排队”


共 (0) 个答案