0

0

Reactor框架中非阻塞地聚合多个Flux流结果到单个Mono对象

DDD

DDD

发布时间:2025-12-03 15:50:33

|

868人浏览过

|

来源于php中文网

原创

reactor框架中非阻塞地聚合多个flux流结果到单个mono对象

本文将深入探讨在Project Reactor框架中,如何高效且非阻塞地将多个独立的Flux流的聚合结果合并为一个单一的Mono对象。通过详细分析常见的错误模式,并引入Reactor提供的zip操作符,我们将展示如何优雅地实现这一目标,确保应用程序的响应性和并发性。

引言:响应式数据流聚合的挑战

响应式编程中,我们经常需要从不同的异步源获取数据流,并在所有数据都可用后将它们组合成一个统一的结果对象。例如,您可能需要从两个不同的服务获取成功账户列表和失败账户列表,然后将它们封装在一个Payments对象中。

一个常见的错误尝试是,在获取到Flux流后,立即调用collectList().subscribe()来获取数据,并尝试在订阅回调外部构建结果。然而,这种做法通常会导致阻塞,因为它试图在响应式流完成之前,同步地访问其结果。在Reactor中,subscribe()方法是非阻塞的,但如果您在订阅回调之外立即依赖其副作用来构建一个同步对象,那么在异步操作完成之前,您将无法获得所需的数据,从而引入阻塞或不确定的行为。

考虑以下数据模型和初始的错误尝试:

package org.example;

import lombok.Builder;
import lombok.Getter;
import lombok.ToString;

import java.util.List;

@Getter
@Builder
@ToString
public class Payments {
    private List successAccounts;
    private List failedAccounts;

    @Getter
    @Builder
    @ToString
    public static class SuccessAccount {
        private String name;
        private String accountNumber;
    }

    @Getter
    @Builder
    @ToString
    public static class FailedAccount {
        private String name;
        private String accountNumber;
        private String errorCode;
    }
}

以及一个试图聚合的错误方法:

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;

public class Main {

    public static Mono getPaymentDataIncorrect() {
        Flux accountsSucceeded = getAccountsSucceeded();
        Flux accountsFailed = getAccountsFailed();

        List successAccounts = new ArrayList<>();
        List failedAccounts = new ArrayList<>();

        // 这种方式是阻塞的,因为它试图在异步操作完成前同步地填充列表
        accountsFailed.collectList().subscribe(failedAccounts::addAll);
        accountsSucceeded.collectList().subscribe(successAccounts::addAll);

        // 在此处,successAccounts和failedAccounts可能还未被填充
        return Mono.just(Payments.builder()
                .failedAccounts(failedAccounts)
                .successAccounts(successAccounts)
                .build());
    }
    // ... getAccountsSucceeded() 和 getAccountsFailed() 方法省略,与原始问题相同
}

上述代码中的accountsFailed.collectList().subscribe(failedAccounts::addAll)和accountsSucceeded.collectList().subscribe(successAccounts::addAll)虽然subscribe本身是非阻塞的,但它不会立即填充failedAccounts和successAccounts。当Mono.just()被调用时,这两个列表很可能仍然是空的,因为订阅的回调是异步执行的。这导致了逻辑上的错误,并且如果强制同步等待,则会引入阻塞。

使用 zip 操作符实现非阻塞聚合

Project Reactor提供了zip操作符来解决这种场景。zip操作符能够将多个Publisher(例如Mono或Flux)的元素按照索引进行组合,当所有参与的Publisher都发出一个元素时,zip操作符会将这些元素组合成一个新的元素。

NewsBang
NewsBang

盛大旗下AI团队推出的智能新闻阅读App

下载

在我们的案例中,我们需要将两个Flux流的最终聚合结果(即List)组合起来。首先,我们可以使用collectList()操作符将每个Flux转换为一个Mono,表示该流所有元素的列表。然后,我们就可以使用Mono.zipWith()来组合这两个Mono

Mono.zipWith()接受另一个Mono作为参数,以及一个BiFunction(或更高阶的函数,如zip有多个重载),该函数定义了如何将两个Mono发出的结果组合成一个新的结果。

以下是使用zipWith操作符的正确实现:

package org.example;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;

public class Main {

    public static void main(String[] args) {
        getPaymentData().subscribe(System.out::println);
    }

    public static Mono getPaymentData() {
        // 1. 获取两个独立的Flux流
        Flux accountsSucceededFlux = getAccountsSucceeded();
        Flux accountsFailedFlux = getAccountsFailed();

        // 2. 将每个Flux转换为一个Mono
        // collectList() 会收集Flux中的所有元素,并在Flux完成时发出一个包含这些元素的List
        Mono> failedAccountsMono = accountsFailedFlux.collectList();
        Mono> successAccountsMono = accountsSucceededFlux.collectList();

        // 3. 使用 Mono.zipWith() 组合两个 Mono
        // zipWith 会等待两个Mono都发出其结果,然后使用提供的BiFunction进行组合
        Mono combined = failedAccountsMono.zipWith(
                successAccountsMono,
                (failedAccounts, successAccounts) -> Payments.builder()
                        .failedAccounts(failedAccounts)
                        .successAccounts(successAccounts)
                        .build()
        );
        return combined;
    }

    // 模拟获取成功账户的Flux流
    public static Flux getAccountsSucceeded() {
        return Flux.just(Payments.SuccessAccount.builder()
                        .accountNumber("1234345")
                        .name("Payee1")
                        .build(),
                Payments.SuccessAccount.builder()
                        .accountNumber("83673674")
                        .name("Payee2")
                        .build());
    }

    // 模拟获取失败账户的Flux流
    public static Flux getAccountsFailed() {
        return Flux.just(Payments.FailedAccount.builder()
                        .accountNumber("12234345")
                        .name("Payee3")
                        .errorCode("8938")
                        .build(),
                Payments.FailedAccount.builder()
                        .accountNumber("3342343")
                        .name("Payee4")
                        .errorCode("8938")
                        .build());
    }
}

在这个修正后的实现中:

  1. getAccountsSucceeded() 和 getAccountsFailed() 方法返回了两个独立的 Flux 流。
  2. accountsFailedFlux.collectList() 和 accountsSucceededFlux.collectList() 将这两个 Flux 转换为两个 Mono。这些 Mono 会在各自的 Flux 完成收集所有元素后发出一个 List。
  3. failedAccountsMono.zipWith(successAccountsMono, ...) 操作符会等待 failedAccountsMono 和 successAccountsMono 都发出它们的 List 结果。一旦两个结果都可用,zipWith 会调用提供的 BiFunction(在本例中是一个Lambda表达式),将这两个 List 作为参数传入,并使用它们构建一个 Payments 对象。
  4. 最终,zipWith 操作符返回一个 Mono,它会在 Payments 对象成功构建后发出该对象。整个过程是非阻塞的,并且完全符合响应式编程范式。

关键概念与优势

  • 非阻塞性: zip 操作符是完全非阻塞的。它不会在等待上游Publisher发出元素时阻塞当前线程。相反,它会注册订阅,并在元素可用时异步地处理它们。
  • 并发执行: zip 操作符的两个上游Publisher(在本例中是两个 collectList() 操作)可以并发地执行。这意味着获取成功账户和失败账户的数据流可以同时进行,从而提高整体效率。
  • 结果组合的原子性: zip 确保只有当所有参与的Publisher都准备好发出一个元素时,组合函数才会被调用。这保证了在创建 Payments 对象时,所需的两个 List 数据都是完整且可用的。
  • 错误处理: 如果任何一个上游 Mono 在发出其 List 之前失败,zip 操作符将立即传播该错误,而不会等待其他 Mono 完成。

总结

在Project Reactor中,当需要将多个独立的异步数据流(Flux或Mono)的最终结果聚合成一个单一的响应式对象时,zip操作符是首选的非阻塞解决方案。通过将每个Flux首先转换为一个Mono(使用collectList()),然后利用Mono.zipWith()结合一个自定义的组合函数,可以优雅且高效地实现复杂的聚合逻辑,同时保持应用程序的响应性和并发性。避免在响应式流中进行同步阻塞操作是构建高性能、可伸缩的响应式系统的关键。

相关专题

更多
lambda表达式
lambda表达式

Lambda表达式是一种匿名函数的简洁表示方式,它可以在需要函数作为参数的地方使用,并提供了一种更简洁、更灵活的编码方式,其语法为“lambda 参数列表: 表达式”,参数列表是函数的参数,可以包含一个或多个参数,用逗号分隔,表达式是函数的执行体,用于定义函数的具体操作。本专题为大家提供lambda表达式相关的文章、下载、课程内容,供大家免费下载体验。

204

2023.09.15

python lambda函数
python lambda函数

本专题整合了python lambda函数用法详解,阅读专题下面的文章了解更多详细内容。

190

2025.11.08

Python lambda详解
Python lambda详解

本专题整合了Python lambda函数相关教程,阅读下面的文章了解更多详细内容。

47

2026.01.05

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

481

2023.08.10

高德地图升级方法汇总
高德地图升级方法汇总

本专题整合了高德地图升级相关教程,阅读专题下面的文章了解更多详细内容。

2

2026.01.16

全民K歌得高分教程大全
全民K歌得高分教程大全

本专题整合了全民K歌得高分技巧汇总,阅读专题下面的文章了解更多详细内容。

0

2026.01.16

C++ 单元测试与代码质量保障
C++ 单元测试与代码质量保障

本专题系统讲解 C++ 在单元测试与代码质量保障方面的实战方法,包括测试驱动开发理念、Google Test/Google Mock 的使用、测试用例设计、边界条件验证、持续集成中的自动化测试流程,以及常见代码质量问题的发现与修复。通过工程化示例,帮助开发者建立 可测试、可维护、高质量的 C++ 项目体系。

10

2026.01.16

java数据库连接教程大全
java数据库连接教程大全

本专题整合了java数据库连接相关教程,阅读专题下面的文章了解更多详细内容。

33

2026.01.15

Java音频处理教程汇总
Java音频处理教程汇总

本专题整合了java音频处理教程大全,阅读专题下面的文章了解更多详细内容。

15

2026.01.15

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
React 教程
React 教程

共58课时 | 3.7万人学习

国外Web开发全栈课程全集
国外Web开发全栈课程全集

共12课时 | 1.0万人学习

React核心原理新老生命周期精讲
React核心原理新老生命周期精讲

共12课时 | 1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号