
本文介绍如何在 project reactor 中优雅地组合 mono 操作:先串行获取上游数据,再并行触发多个依赖该数据的下游操作,避免阻塞调用(如 block),充分利用响应式编程的非阻塞并发能力。
本文介绍如何在 project reactor 中优雅地组合 mono 操作:先串行获取上游数据,再并行触发多个依赖该数据的下游操作,避免阻塞调用(如 block),充分利用响应式编程的非阻塞并发能力。
在响应式编程中,滥用 block() 不仅破坏了非阻塞契约,还会导致线程阻塞、资源浪费甚至死锁风险。上述场景中,resultAMono.block() 是典型反模式——它强行将异步流转为同步调用,彻底丧失了 Reactor 的调度优势与可伸缩性。
正确的做法是通过操作符编排数据流:使用 flatMap 实现“等待 A 完成后再启动 B 和 C”,再借助 Flux.merge 或 Mono.zip 实现 B 与 C 的并行执行。
✅ 场景一:B 和 C 均返回 Mono<Void>(仅需执行,不关心结果)
当 loadB 和 loadC 仅承担副作用(如写日志、更新缓存、发送消息),且返回 Mono<Void> 时,推荐使用 Flux.merge:
Mono<List<String>> resultAMono = listA();
Mono<Void> res = resultAMono
.flatMap(a -> Flux.merge(loadB(a), loadC()).then());- Flux.merge(...) 将两个 Mono<Void> 合并为一个 Flux<Void>,并并发订阅二者;
- then() 将 Flux<Void> 转换为 Mono<Void>,语义为“等所有任务完成即发出 onComplete”;
- 整个链路保持完全非阻塞,且 B 的执行严格依赖 A 的结果 a,符合串行→并行的语义。
✅ 场景二:B 和 C 返回有值的 Mono(如 Mono<B> 和 Mono<C>)
若需同时消费 B 和 C 的结果(例如聚合后保存),应改用 Mono.zip:
Mono<B> loadB(List<String> a) { /* ... */ }
Mono<C> loadC() { /* ... */ }
Mono<Tuple2<B, C>> res = resultAMono
.flatMap(a -> Mono.zip(loadB(a), loadC()));- Mono.zip 并发执行两个 Mono,并在二者都完成时,以 Tuple2 形式合并结果;
- 支持泛型重载(如 Mono.zip(loadB(a), loadC(), (b, c) -> new Result(b, c)))实现自定义组装;
- 同样保证 B 的输入来自 A 的输出,而 B 与 C 之间无依赖、完全并行。
⚠️ 关键注意事项
- 永远避免 block():它会阻塞当前线程,在 WebFlux 或 R2DBC 等场景中极易引发线程池耗尽;
-
merge vs zip 的选择依据:
- 用 merge:关注“全部完成”,不关心中间值或顺序,适合纯副作用操作;
- 用 zip:需结构化组合结果,且对完成顺序无强要求(二者仍并发执行);
- 错误传播:两种方式均遵循 Reactor 错误传播规则——任一子 Mono 发生错误,整个链路将立即终止并发出 onError;
- 资源安全:flatMap 内部自动管理订阅生命周期,无需手动取消或清理。
通过合理选用 flatMap + merge / zip,你能在一行声明式代码中精准表达“先串后并”的复杂执行逻辑,既简洁又高效,真正践行响应式编程的设计哲学。









