0

0

深入理解CompletableFuture:实现任务的顺序执行与结果收集

碧海醫心

碧海醫心

发布时间:2025-08-02 23:02:21

|

569人浏览过

|

来源于php中文网

原创

深入理解completablefuture:实现任务的顺序执行与结果收集

本文旨在探讨如何使用Java的CompletableFuture实现一系列异步任务的顺序执行,并将所有任务的结果收集到一个列表中。我们将分析常见的陷阱,如不当的线程管理和并发执行问题,并提供两种优雅且高效的解决方案,确保任务按预期顺序完成并正确汇总结果。

1. 问题背景与挑战

在异步编程中,CompletableFuture是处理并发任务的强大工具。然而,当面临需要严格顺序执行的异步任务链,并且需要收集每个任务的结果时,可能会遇到一些挑战。例如,业务场景可能要求前一个任务完成后,后一个任务才能开始,同时我们希望将所有任务的计算结果汇总到一个集合中。

考虑一个耗时的业务处理函数,它返回一个CompletionStage

import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

public class SequentialTaskProcessor {

    private CompletionStage process(int a) {
        return CompletableFuture.supplyAsync(() -> {
            System.err.printf("%s dispatch %d\n", LocalDateTime.now(), a);
            // 模拟长时间运行的业务处理
            try {
                Thread.sleep(10); // 增加延迟以观察效果
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return a + 10;
        }).whenCompleteAsync((e, t) -> {
            if (t != null)
                System.err.printf("!!! error processing '%d' !!!\n", a);
            System.err.printf("%s finish %d\n", LocalDateTime.now(), e);
        });
    }

我们的目标是多次调用process函数,确保它们按顺序执行,并将每次的结果收集到一个List中。

1.1 常见误区:thenApplyAsync与内部join()

一种直观的尝试是使用thenApplyAsync并在其内部调用process(element).toCompletableFuture().join()。

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

// ... (process方法同上)

public void firstApproach() {
    List arr = IntStream.range(1, 10).boxed().collect(Collectors.toList());
    CompletionStage> result = CompletableFuture.completedFuture(new ArrayList<>());

    for (Integer element : arr) {
        result = result.thenApplyAsync((ret) -> {
            // 在thenApplyAsync内部阻塞等待前一个CompletableFuture完成
            Integer a = process(element).toCompletableFuture().join(); 
            ret.add(a);
            return ret;
        });
    }

    List computeResult = result.toCompletableFuture().join();
    System.out.println("First approach results: " + computeResult);
}

问题分析: 虽然这种方法能够实现顺序执行并收集结果,但它效率低下。thenApplyAsync本身会在一个线程池中执行其回调,而回调内部的process(element).toCompletableFuture().join()又会阻塞这个线程,直到process方法返回的CompletableFuture完成。这意味着一个逻辑步骤可能间接占用两个线程资源(一个用于thenApplyAsync的回调,另一个用于process内部的异步任务),造成线程资源的浪费和不必要的阻塞。观察输出日志,会发现dispatch和finish的时间戳是严格顺序的,但线程利用率不高。

1.2 常见误区:thenCombineAsync的并发陷阱

另一种尝试是使用thenCombineAsync,期望它能将前一个阶段的结果与新任务的结果结合:

// ... (process方法同上)

public void secondApproach() {
    List arr = IntStream.range(1, 10).boxed().collect(Collectors.toList());
    CompletionStage> result = CompletableFuture.completedFuture(new ArrayList<>());

    for (Integer element : arr) {
        // process(element) 在这里被立即调用,而非等待前一个阶段完成
        result = result.thenCombineAsync(process(element), (array, ret) -> { 
            array.add(ret); 
            return array; 
        });
    }

    List computeResult = result.toCompletableFuture().join();
    System.out.println("Second approach results: " + computeResult);
}

问题分析: 这种方法会导致任务并发执行,而非顺序执行。thenCombineAsync的第二个参数CompletionStage other在方法调用时就会被评估并启动。这意味着在循环中,所有的process(element)调用几乎是同时发起的,它们会并发执行。观察输出日志,会发现dispatch的时间戳是交错的,这违反了顺序执行的要求。thenCombineAsync适用于两个独立的异步任务都完成后再进行合并的场景,而不是链式顺序执行的场景。

2. 解决方案:顺序链式执行与结果收集

为了实现任务的顺序执行并高效地收集结果,我们需要利用CompletableFuture提供的更高级的组合方法,特别是thenCompose。

2.1 方案一:使用外部列表收集结果

这种方法通过thenCompose确保任务顺序执行,并使用thenAccept将结果添加到循环外部维护的列表中。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

// ... (process方法同上)

public class SequentialTaskProcessor {
    // ... process 方法 ...

    public void solutionOne() {
        List arr = IntStream.range(1, 10).boxed().collect(Collectors.toList());

        // 初始化一个表示链式操作开始的CompletableFuture,其结果类型为Void
        CompletionStage loopStage = CompletableFuture.completedFuture(null);
        final List resultList = new ArrayList<>(); // 外部结果列表

        for (Integer element : arr) {
            loopStage = loopStage
                    // thenCompose确保前一个阶段完成后,才执行process(element)
                    .thenCompose(v -> process(element)) 
                    // thenAccept将process的结果添加到外部列表中,并返回CompletionStage
                    .thenAccept(resultList::add); 
        }

        // 阻塞等待所有任务完成
        loopStage.toCompletableFuture().join(); 

        System.out.println("Solution One results: " + resultList);
    }

    public static void main(String[] args) {
        SequentialTaskProcessor processor = new SequentialTaskProcessor();
        System.out.println("--- Running Solution One ---");
        processor.solutionOne();
        System.out.println("\n--- Running Solution Two ---");
        processor.solutionTwo();
    }
}

原理详解:

Sheet+
Sheet+

Excel和GoogleSheets表格AI处理工具

下载
  1. CompletionStage loopStage = CompletableFuture.completedFuture(null);:我们从一个已完成的CompletableFuture开始,其结果类型为Void。这提供了一个初始的“钩子”来启动任务链。
  2. loopStage = loopStage.thenCompose(v -> process(element)):thenCompose是这里的关键。它接收一个函数,该函数返回一个新的CompletionStage。这意味着process(element)只会在loopStage(即前一个任务)完成后才会被调用并开始执行。这确保了任务的严格顺序性。thenCompose的作用是将CompletionStage(来自loopStage)和CompletionStage(来自process)的结果扁平化为一个新的CompletionStage
  3. .thenAccept(resultList::add):在process(element)完成并产生结果后,thenAccept会异步地将该结果添加到resultList中。thenAccept本身返回一个CompletionStage,这使得loopStage可以继续作为链的下一个开始点,而不必传递一个累积的列表。
  4. loopStage.toCompletableFuture().join():最后,我们阻塞等待整个任务链的最终阶段完成。此时,resultList将包含所有任务的顺序结果。

这种方法简洁且高效,避免了不必要的阻塞和线程浪费。

2.2 方案二:在链中传递并累积列表

另一种方法是在CompletableFuture链中直接传递并累积结果列表。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

// ... (process方法同上)

public class SequentialTaskProcessor {
    // ... process 方法 ...

    public void solutionTwo() {
        List arr = IntStream.range(1, 10).boxed().collect(Collectors.toList());
        // 初始化一个携带空列表的CompletableFuture
        CompletionStage> listStage = CompletableFuture.completedFuture(new ArrayList<>());

        for (Integer element : arr) {
            listStage = listStage
                    // thenCompose确保前一个阶段完成后,才执行process(element)
                    .thenCompose(list -> process(element) 
                            // thenAccept将process的结果添加到当前列表
                            .thenAccept(list::add) 
                            // thenApply将CompletionStage转换回CompletionStage>
                            .thenApply(v -> list) 
                    );
        }

        // 阻塞等待所有任务完成,并获取最终的列表
        List resultList = listStage.toCompletableFuture().join(); 

        System.out.println("Solution Two results: " + resultList);
    }
    // ... main 方法 ...
}

原理详解:

  1. CompletionStage> listStage = CompletableFuture.completedFuture(new ArrayList());:我们从一个包含空列表的CompletableFuture开始,这个列表将作为结果的累积器。
  2. listStage = listStage.thenCompose(list -> ...):同样使用thenCompose来确保顺序执行。这里的list参数是前一个阶段传递过来的结果列表。
  3. process(element).thenAccept(list::add):在thenCompose的函数内部,我们启动process(element)任务。当它完成时,使用thenAccept将结果添加到当前list中。
  4. .thenApply(v -> list):这是关键一步。thenAccept返回的是CompletionStage,但为了将list传递给下一个迭代,我们需要将其结果类型转换回CompletionStage>。thenApply(v -> list)实现了这一点:它在thenAccept完成后被调用,并简单地返回当前的list对象,从而将列表传递给链中的下一个thenCompose。
  5. List resultList = listStage.toCompletableFuture().join();:最终,整个链完成时,listStage的结果就是包含了所有累积结果的列表。

3. 总结与注意事项

两种解决方案都能够有效地实现异步任务的顺序执行和结果收集,并且都避免了线程阻塞和并发执行的问题。

  • 方案一(外部列表)
    • 优点:代码逻辑相对直观,loopStage只关心任务的完成状态(Void),结果列表在外部维护。
    • 适用场景:当任务链的中间结果不需要在CompletableFuture链中传递,只需最终汇总时。
  • 方案二(链中传递列表)
    • 优点:结果列表直接作为CompletableFuture链的一部分进行传递和累积,整个操作封装在一个CompletableFuture中,最终结果直接从CompletableFuture获取。
    • 适用场景:当需要将累积的结果作为链中下一个任务的输入,或者更倾向于将所有状态变化封装在CompletableFuture链内部时。

注意事项:

  • 异常处理:在实际应用中,需要为CompletableFuture链添加适当的异常处理机制,例如使用exceptionally、handle等方法来处理任务执行过程中可能出现的错误。
  • 线程池管理:CompletableFuture默认使用ForkJoinPool.commonPool()。对于长时间运行或IO密集型任务,建议为supplyAsync、thenApplyAsync等方法指定自定义的Executor,以更好地控制线程资源,避免阻塞公共线程池。
  • 任务原子性:确保process方法内部的业务逻辑是线程安全的,如果它操作共享资源,需要额外的同步机制。本文的重点在于CompletableFuture的链式调用,而非process方法本身的线程安全性。

通过理解thenCompose的扁平化特性和thenAccept/thenApply的组合使用,我们可以更灵活、高效地构建复杂的异步任务流,满足各种顺序执行和结果收集的需求。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
c语言中null和NULL的区别
c语言中null和NULL的区别

c语言中null和NULL的区别是:null是C语言中的一个宏定义,通常用来表示一个空指针,可以用于初始化指针变量,或者在条件语句中判断指针是否为空;NULL是C语言中的一个预定义常量,通常用来表示一个空值,用于表示一个空的指针、空的指针数组或者空的结构体指针。

237

2023.09.22

java中null的用法
java中null的用法

在Java中,null表示一个引用类型的变量不指向任何对象。可以将null赋值给任何引用类型的变量,包括类、接口、数组、字符串等。想了解更多null的相关内容,可以阅读本专题下面的文章。

499

2024.03.01

javascriptvoid(o)怎么解决
javascriptvoid(o)怎么解决

javascriptvoid(o)的解决办法:1、检查语法错误;2、确保正确的执行环境;3、检查其他代码的冲突;4、使用事件委托;5、使用其他绑定方式;6、检查外部资源等等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

177

2023.11.23

java中void的含义
java中void的含义

本专题整合了Java中void的相关内容,阅读专题下面的文章了解更多详细内容。

102

2025.11.27

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

546

2023.08.10

C++类型转换方式
C++类型转换方式

本专题整合了C++类型转换相关内容,想了解更多相关内容,请阅读专题下面的文章。

304

2025.07.15

go语言 注释编码
go语言 注释编码

本专题整合了go语言注释、注释规范等等内容,阅读专题下面的文章了解更多详细内容。

32

2026.01.31

go语言 math包
go语言 math包

本专题整合了go语言math包相关内容,阅读专题下面的文章了解更多详细内容。

23

2026.01.31

go语言输入函数
go语言输入函数

本专题整合了go语言输入相关教程内容,阅读专题下面的文章了解更多详细内容。

16

2026.01.31

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
React 教程
React 教程

共58课时 | 4.5万人学习

Pandas 教程
Pandas 教程

共15课时 | 1万人学习

ASP 教程
ASP 教程

共34课时 | 4.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号