
本文详解如何在 spring webflux 中使用 `flux.interval` 定期触发异步数据库查询,通过 `flatmap` 正确组合 `mono`(如查询最新文档)与 `flux`,实现每秒拉取并发布数据库中最新记录的描述字段。
在响应式编程中,不能将阻塞式思维套用于异步流操作。例如,直接在 map() 中调用返回 Mono
正确做法是:使用 flatMap 将每个定时信号(由 Flux.interval 发出)映射为一个新的异步查询流。flatMap 会订阅每个 Mono 并将其扁平化为 Flux 元素,从而保持整个流的响应式特性。
以下是修正后的控制器代码:
@RestController
public class WebFluxController {
@Autowired
private ReactiveDocumentRepository reactiveDocumentRepository;
@CrossOrigin
@GetMapping(value = "/documents", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux getDocuments() {
return Flux.interval(Duration.ofSeconds(1))
.onBackpressureLatest() // 防止下游消费慢导致事件积压
.flatMap(x -> reactiveDocumentRepository.findLastDocument()
.map(document -> "document-" + document.getDescription())
.defaultIfEmpty("document-NO_DATA")); // 处理查无结果场景
}
} ? 关键说明:
- produces = MediaType.TEXT_EVENT_STREAM_VALUE 显式声明返回 SSE(Server-Sent Events)格式,便于浏览器或客户端以流式方式接收持续更新;
- onBackpressureLatest() 可选但推荐,避免因数据库响应延迟或客户端读取缓慢造成内存溢出;
- defaultIfEmpty() 提供兜底值,防止 Mono.empty() 导致流中断(flatMap 遇到空 Mono 会跳过该次发射);
- findLastDocument() 必须是真正响应式的实现(如基于 Spring Data R2DBC 或 Reactive MongoDB),否则将破坏响应式契约。
⚠️ 注意事项:
- 频繁轮询数据库(如每秒一次)并非高并发场景下的最优实践。生产环境中建议结合变更通知机制(如 PostgreSQL 的 LISTEN/NOTIFY、MongoDB Change Streams)实现真正的事件驱动推送;
- 若数据库查询耗时较长,应考虑添加超时控制:
.timeout(Duration.ofSeconds(2), Mono.just("document-TIMEOUT")) - 确保 ReactiveDocumentRepository 接口正确定义了 findLastDocument() 方法,例如:
Mono
findLastDocument();
综上,flatMap 是连接定时触发器与异步数据源的核心桥梁。掌握它与 map、switchMap、concatMap 的语义差异,是构建健壮 WebFlux 流的关键一步。










