0

0

Java中利用CompletableFuture高效并行处理大型列表数据

心靈之曲

心靈之曲

发布时间:2025-07-28 15:42:18

|

1056人浏览过

|

来源于php中文网

原创

Java中利用CompletableFuture高效并行处理大型列表数据

本文深入探讨了在Java中如何利用CompletableFuture和ExecutorService高效并行处理大型列表数据。针对将耗时操作并行化的常见需求,文章分析了在并行处理中可能遇到的陷阱,特别是过早调用CompletableFuture::join导致任务串行执行的问题。通过提供正确的并行处理策略和示例代码,指导读者实现真正的并发执行,并有效聚合结果,从而显著提升数据处理性能。

1. 引言:并行处理大型列表的必要性

在现代数据处理场景中,我们经常需要对包含数万甚至数十万条记录的大型列表执行耗时操作,例如网络请求、数据库查询、复杂计算或文件i/o。如果采用传统的顺序处理方式,即使单条记录的处理时间很短,累积起来也可能导致整个流程耗时数小时,严重影响系统吞吐量和用户体验。

Java 8引入的CompletableFuture为异步编程和并行处理提供了强大的支持,它能够帮助我们有效地将这些耗时任务分解并并行执行,从而显著缩短总处理时间。然而,不恰当的使用方式也可能导致并行能力无法充分发挥,甚至退化为串行执行。

2. 初始尝试与常见陷阱分析

许多开发者在尝试使用CompletableFuture进行并行处理时,可能会遇到一个常见问题:尽管代码看起来像是并行的,但实际执行却仍然是串行的。以下是一个典型的错误示例:

import com.google.common.collect.Lists;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

// 假设存在以下辅助类和方法
// class ListItem { /* ... */ }
// class ProcessResult { /* ... */ }
// class OutputBean { /* ... */ }
// class MyService { public Optional<ProcessResult> methodA(ListItem item) { /* ... */ } }
// class MyProcessor {
//     private MyService service = new MyService();
//     private OutputBean mapToBean(ProcessResult result, ListItem originalItem) { /* ... */ }
//     public List<OutputBean> executeListPart(List<ListItem> subList) {
//         return subList.stream()
//                       .map(listItem -> service.methodA(listItem)
//                                               .map(result -> mapToBean(result, listItem)))
//                       .flatMap(Optional::stream)
//                       .collect(Collectors.toList());
//     }
// }

public class ParallelProcessingIncorrect {

    // 假设这是您的列表和处理器实例
    private static List<ListItem> largeList = /* 初始化一个包含50k ListItem的列表 */;
    private static MyProcessor processor = new MyProcessor();

    public static void main(String[] args) {
        int noOfCores = Runtime.getRuntime().availableProcessors();
        ExecutorService service = Executors.newFixedThreadPool(noOfCores - 1);

        try {
            long startTime = System.currentTimeMillis();
            List<OutputBean> results = Lists.partition(largeList, 500).stream()
                    .map(item -> CompletableFuture.supplyAsync(() -> processor.executeListPart(item), service))
                    // 核心问题:在这里调用 CompletableFuture::join
                    .map(CompletableFuture::join)
                    .flatMap(List::stream)
                    .collect(Collectors.toList());
            long endTime = System.currentTimeMillis();
            System.out.println("Incorrect approach total time: " + (endTime - startTime) + " ms");
            System.out.println("Processed " + results.size() + " items.");

        } finally {
            service.shutdown();
        }
    }
}

上述代码的问题在于 .map(CompletableFuture::join) 这一行。CompletableFuture.join() 方法是一个阻塞操作,它会等待当前 CompletableFuture 完成并返回其结果。这意味着,当 Stream 处理第一个分区的 CompletableFuture 时,它会立即阻塞并等待该分区的所有任务完成,然后才能继续处理下一个分区的 CompletableFuture。结果是,尽管每个分区内部的任务可能在单独的线程中执行,但不同分区之间的处理却是严格串行的,从而失去了并行处理的优势。

3. 正确的CompletableFuture并行处理策略

要实现真正的并行执行,关键在于将异步任务的创建和结果的等待(join)分离。我们应该首先创建并提交所有异步任务,将它们的CompletableFuture实例收集到一个列表中,然后在一个单独的步骤中等待所有这些CompletableFuture完成。

立即学习Java免费学习笔记(深入)”;

以下是正确的并行处理代码示例:

AITDK
AITDK

免费AI SEO工具,SEO的AI生成器

下载
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

// 辅助类定义(与上述示例相同,此处省略以保持简洁)
// class ListItem { /* ... */ }
// class ProcessResult { /* ... */ }
// class OutputBean { /* ... */ }
// class MyService { /* ... */ }
// class MyProcessor { /* ... */ }


public class ParallelProcessingCorrect {

    private static List<ListItem> largeList; // 假设已初始化,例如:
    static {
        largeList = new ArrayList<>();
        for (int i = 0; i < 50000; i++) {
            largeList.add(new ListItem("item_" + i));
        }
    }
    private static MyProcessor processor = new MyProcessor();

    public static void main(String[] args) throws InterruptedException {
        int noOfCores = Runtime.getRuntime().availableProcessors();
        ExecutorService service = Executors.newFixedThreadPool(noOfCores - 1); // 推荐线程池大小为核心数-1或根据IO/CPU密集型任务调整

        try {
            long startTime = System.currentTimeMillis();

            // 1. 创建并提交所有异步任务,收集CompletableFuture实例
            List<CompletableFuture<List<OutputBean>>> futures = Lists.partition(largeList, 500).stream()
                    .map(itemPart -> CompletableFuture.supplyAsync(() -> processor.executeListPart(itemPart), service))
                    .collect(Collectors.toList());

            // 2. 等待所有CompletableFuture完成并获取结果
            // 使用 CompletableFuture.allOf() 可以等待所有Future完成,但其本身不返回结果
            // 更好的做法是遍历futures列表,逐个join或使用allof().join()后,再map获取结果

            // 方法一:遍历futures列表,逐个join(更直接,但仍然是顺序join)
            // List<OutputBean> results = futures.stream()
            //                                  .map(CompletableFuture::join) // 此时join是等待所有任务提交后才开始
            //                                  .flatMap(List::stream)
            //                                  .collect(Collectors.toList());

            // 方法二:使用 CompletableFuture.allOf() 结合 thenApply/thenCompose(更优雅,推荐)
            CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

            List<OutputBean> results = allOf.thenApply(v -> futures.stream()
                                                .map(CompletableFuture::join) // 此时所有future都已完成,join是非阻塞的
                                                .flatMap(List::stream)
                                                .collect(Collectors.toList()))
                                        .join(); // 等待所有结果收集完成

            long endTime = System.currentTimeMillis();
            System.out.println("Correct approach total time: " + (endTime - startTime) + " ms");
            System.out.println("Processed " + results.size() + " items.");

        } finally {
            // 确保线程池关闭
            service.shutdown();
            if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
                System.err.println("ExecutorService did not terminate in the specified time.");
                service.shutdownNow();
            }
        }
    }
}

工作原理:

  1. 任务提交: Lists.partition(largeList, 500).stream().map(...) 这部分会遍历所有分区,并为每个分区创建一个 CompletableFuture 任务。CompletableFuture.supplyAsync() 会将任务提交给 ExecutorService 立即执行,而不会阻塞当前的流处理。
  2. Future收集: 所有的 CompletableFuture 实例被收集到一个 List<CompletableFuture<List<OutputBean>>> futures 中。此时,所有任务可能已经开始并行执行了。
  3. 统一等待:
    • CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) 创建了一个新的 CompletableFuture,它会在所有传入的 CompletableFuture 都完成后才完成。
    • .thenApply(...) 定义了在 allOf 完成后执行的逻辑,即遍历 futures 列表,对每个 CompletableFuture 调用 join()。此时,由于 allOf 已经确保所有子任务都已完成,因此这些 join() 调用将是非阻塞的,能够立即获取结果。
    • 最后的 .join() 是等待整个结果聚合过程完成。

通过这种方式,所有分区的处理任务几乎同时开始执行,只有在需要聚合所有结果时,主线程才会被阻塞,从而实现了真正的并行加速。

4. ExecutorService的管理与考量

ExecutorService 是管理线程池的核心组件。在并行处理中,它的配置和生命周期管理至关重要。

  • 创建: Executors.newFixedThreadPool(noOfCores - 1) 是一个常见的选择,它创建一个固定大小的线程池。线程池的大小应根据任务类型(CPU密集型或I/O密集型)和系统可用资源进行调整。对于CPU密集型任务,通常设置为 CPU核心数 - 1 或 CPU核心数;对于I/O密集型任务,可以设置得更大,因为线程在等待I/O时不会占用CPU。
  • 生命周期: ExecutorService 是一个重量级资源,应该在不再需要时显式关闭。
    • service.shutdown():启动有序关闭,不再接受新任务,但会完成已提交的任务。
    • service.awaitTermination(timeout, unit):等待已提交任务完成,或直到超时。这通常与 shutdown() 配合使用,以确保所有任务在程序退出前完成。
    • service.shutdownNow():尝试立即停止所有正在执行的任务,并停止等待中的任务。这通常在 awaitTermination 超时后作为强制关闭的手段。

在应用程序的整个生命周期中,如果会频繁地进行并行处理,通常推荐复用同一个 ExecutorService 实例,而不是每次都创建和关闭新的实例,以减少资源开销。

5. 注意事项与性能优化

  • 任务粒度: 适当的任务分块大小(如 Lists.partition(list, 500))非常重要。如果分块过小,会增加任务提交和线程调度的开销;如果分块过大,则可能导致某些线程负载过重,无法充分利用并行性。最佳分块大小通常需要根据实际任务的复杂度和执行时间进行测试和调整。
  • 异常处理: 在并行任务中,异常处理变得更为复杂。CompletableFuture 提供了 exceptionally() 和 handle() 等方法来处理异步任务中可能抛出的异常。在 join() 或 get() 时,如果任务抛出异常,它们会将异常重新抛出(通常是 CompletionException 或 ExecutionException),因此需要捕获并处理。
  • 结果聚合: CompletableFuture.allOf() 结合 thenApply 是一个优雅的聚合方式。如果需要聚合不同类型的 CompletableFuture 结果,可以使用 CompletableFuture.supplyAsync(() -> future1.join()).thenCombine(future2, (r1, r2) -> ...) 等组合方法。
  • 异步上下文: 确保传递给 CompletableFuture.supplyAsync() 的 ExecutorService 是合适的,避免使用默认的 ForkJoinPool.commonPool(),因为它可能被其他库或系统任务占用,导致资源争抢。

6. 总结

通过 CompletableFuture 进行大型列表的并行处理是提升Java应用性能的有效手段。核心在于避免在任务提交阶段就阻塞性地等待结果。正确的做法是先将所有任务异步提交并收集其 CompletableFuture 实例,待所有任务均已启动或完成时,再统一进行结果的聚合。合理配置 ExecutorService、选择合适的任务粒度以及完善异常处理机制,将确保您的并行处理方案既高效又健壮。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

765

2023.08.10

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

765

2023.08.10

golang map内存释放
golang map内存释放

本专题整合了golang map内存相关教程,阅读专题下面的文章了解更多相关内容。

77

2025.09.05

golang map相关教程
golang map相关教程

本专题整合了golang map相关教程,阅读专题下面的文章了解更多详细内容。

40

2025.11.16

golang map原理
golang map原理

本专题整合了golang map相关内容,阅读专题下面的文章了解更多详细内容。

67

2025.11.17

java判断map相关教程
java判断map相关教程

本专题整合了java判断map相关教程,阅读专题下面的文章了解更多详细内容。

47

2025.11.27

数据库三范式
数据库三范式

数据库三范式是一种设计规范,用于规范化关系型数据库中的数据结构,它通过消除冗余数据、提高数据库性能和数据一致性,提供了一种有效的数据库设计方法。本专题提供数据库三范式相关的文章、下载和课程。

387

2023.06.29

如何删除数据库
如何删除数据库

删除数据库是指在MySQL中完全移除一个数据库及其所包含的所有数据和结构,作用包括:1、释放存储空间;2、确保数据的安全性;3、提高数据库的整体性能,加速查询和操作的执行速度。尽管删除数据库具有一些好处,但在执行任何删除操作之前,务必谨慎操作,并备份重要的数据。删除数据库将永久性地删除所有相关数据和结构,无法回滚。

2111

2023.08.14

TypeScript类型系统进阶与大型前端项目实践
TypeScript类型系统进阶与大型前端项目实践

本专题围绕 TypeScript 在大型前端项目中的应用展开,深入讲解类型系统设计与工程化开发方法。内容包括泛型与高级类型、类型推断机制、声明文件编写、模块化结构设计以及代码规范管理。通过真实项目案例分析,帮助开发者构建类型安全、结构清晰、易维护的前端工程体系,提高团队协作效率与代码质量。

26

2026.03.13

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Sass 教程
Sass 教程

共14课时 | 0.9万人学习

PHP入门速学(台湾同胞版)
PHP入门速学(台湾同胞版)

共10课时 | 1.3万人学习

韩顺平 2016年 最新PHP基础视频教程
韩顺平 2016年 最新PHP基础视频教程

共47课时 | 10.6万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号