
本文介绍如何基于 reactor 的 mono 流式组合能力,将多个有序的 `seedpreprocessor` 串联成一个响应式处理流水线,实现输入文档的逐级异步转换。
在响应式编程中,构建可组合、可扩展的处理链(Pipeline)是常见需求。Spring WebFlux 提供了强大的 Mono 和 Flux 操作符,其中 flatMap 是实现“上一阶段输出作为下一阶段输入”的关键——它支持异步、非阻塞地将前序结果映射为新的 Mono,并自动扁平化嵌套流。
回到你的 PipeLine 实现,核心问题在于:原代码中使用 Flux.fromIterable(...).map(...) 试图一次性对所有处理器应用 process(),但 map 是同步、并发无关的转换操作,无法传递中间状态;更重要的是,proc.process(initial) 始终以原始 initial 为输入,而非前一个处理器的输出,导致无法形成真正的链式调用。
✅ 正确做法是累积式构建 Mono 链:从初始 Mono
出发,依次 flatMap 到每个处理器,使每个 process(I) 的输入动态变为前一步的输出:public Monoexecute(String url) { log.info("Start processing URL = {}", url); PreProcessorDocument initial = new PreProcessorDocument(url); Monoresult = Mono.just(initial); for (SeedPreProcessorprocessor : allProcessors) { result = result.flatMap(processor::process); } return result; }? 这种写法天然具备以下优势:
SuperCms在线订餐系统下载模板采用响应式设计,自动适应手机,电脑及平板显示;满足单一店铺外卖需求。功能:1.菜单分类管理2.菜品管理:菜品增加,删除,修改3.订单管理4.友情链接管理5.数据库备份6.文章模块:如:促销活动,帮助中心7.单页模块:如:企业信息,关于我们更强大的功能在开发中……安装方法:上传到网站根目录,运行http://www.***.com/install 自动
- 完全响应式:全程无阻塞,支持背压,适配高并发 WebFlux 场景;
- 顺序可控:依赖构造时已按 order() 排序的 allProcessors 列表,确保执行顺序严格一致;
- 错误传播自然:任一 Processor 返回 Mono.error() 或抛出异常,都会中断流水线并向下透传;
- 资源轻量:不创建中间集合或额外线程,仅通过函数式组合构建声明式流。
⚠️ 注意事项:
- 确保所有 SeedPreProcessor 实现类的 process() 方法真正返回异步完成的 Mono(例如调用 WebClient, ReactiveMongoTemplate 等),避免意外阻塞(如误用 .block());
- 若需在某处理器失败后降级处理(如默认值兜底),可在对应 flatMap 后追加 .onErrorResume(...);
- 如需记录各阶段耗时或日志,推荐使用 doOnNext, doOnSubscribe, elapsed() 等副作用操作符,而非侵入业务逻辑。
总结:响应式管道的本质不是“遍历处理器”,而是“构建数据流拓扑”。通过 flatMap 的链式累积,你获得的不仅是一段可运行代码,更是一个声明式、可观测、可组合的响应式处理单元——这正是 WebFlux 流水线设计的精髓所在。










