
本文详解如何在 spring boot webflux 中构建一个每秒轮询数据库并实时推送最新记录描述的响应式数据流,重点解决 `mono` 与 `flux` 的正确组合方式,避免阻塞与类型不匹配问题。
在响应式编程中,不能将阻塞式思维套用于异步流操作。原始代码中试图在 map() 内直接“获取” Mono
正确的做法是使用 flatMap:它专为“将每个元素映射为一个新的 Publisher(如 Mono 或 Flux),并自动扁平化订阅结果”而设计。当 Flux.interval() 每秒发出一个 Long 信号时,flatMap 会触发一次 reactiveDocumentRepository.findLastDocument() 查询,并将查询结果(Mono
以下是修正后的完整控制器示例:
@RestController
public class WebFluxController {
@Autowired
private ReactiveDocumentRepository reactiveDocumentRepository;
@CrossOrigin
@GetMapping(value = "/documents", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux getDocuments() {
return Flux.interval(Duration.ofSeconds(1))
.onBackpressureDrop() // 防止下游消费过慢导致内存积压
.flatMap(x -> reactiveDocumentRepository.findLastDocument()
.map(document -> "document-" + document.getDescription())
.defaultIfEmpty("document-null")); // 处理无记录场景,避免流中断
}
} ⚠️ 关键注意事项:
- 必须使用 flatMap,而非 map 或 switchMap:map 无法处理返回 Mono 的函数;switchMap 会取消前序未完成的查询,适用于“最新优先”但非“全量轮询”场景;而 flatMap 确保每次间隔都发起独立查询,符合“持续获取最新记录”的语义。
- 设置合适的响应媒体类型:添加 produces = MediaType.TEXT_EVENT_STREAM_VALUE 显式声明为 Server-Sent Events(SSE),使浏览器或客户端能以流式方式持续接收数据。
- 务必处理空结果:findLastDocument() 在数据库为空时可能返回空 Mono,调用 .defaultIfEmpty(...) 可防止流因 onComplete 提前终止,保障 Flux 持续运行。
- 背压管理不可忽视:.onBackpressureDrop() 可防止高频率查询在下游延迟时造成内存溢出;生产环境可根据需求替换为 onBackpressureBuffer() 或自定义策略。
- 数据库查询需真正响应式:确保 ReactiveDocumentRepository 基于 Spring Data R2DBC 或 Reactor MongoDB 等原生响应式驱动,而非包装阻塞式 JPA —— 否则将破坏整个响应式链路。
总结而言,WebFlux 的核心在于“声明式流编排”而非“命令式执行”。每一次数据库访问都应作为流的一个异步阶段嵌入整体管道,借助 flatMap 实现自然融合。只有严格遵循响应式契约,才能真正发挥非阻塞、高并发、低资源占用的优势。










