
本文详解如何在 spring reactive mongodb 应用中动态停止并安全恢复变更流,利用 disposable 控制订阅生命周期,并结合 resume token 实现断点续传,适用于数据库维护等场景。
本文详解如何在 spring reactive mongodb 应用中动态停止并安全恢复变更流,利用 disposable 控制订阅生命周期,并结合 resume token 实现断点续传,适用于数据库维护等场景。
在响应式 MongoDB 开发中,变更流(Change Stream)是监听集合数据实时变更的核心机制。但生产环境中常需临时中断流(例如执行索引重建、批量迁移或备份),之后从中断位置精准续订,而非丢失事件或重放全量历史。Spring Data MongoDB 的 ReactiveMongoTemplate.changeStream(...) 返回的是 Flux
✅ 正确做法:取消订阅 + 基于 Token 恢复
Flux.subscribe() 返回 Disposable,调用 .dispose() 即可立即终止当前流订阅,释放客户端资源(注意:它不会自动通知 MongoDB 服务端关闭游标,但后续无心跳将由服务端超时清理,符合预期):
// 启动变更流并持有 Disposable 引用
private volatile Disposable currentSubscription;
private volatile BsonValue lastResumeToken;
public void startWatching() {
currentSubscription = reactiveMongoTemplate
.changeStream("collection",
ChangeStreamOptions.builder()
.returnFullDocumentOnUpdate()
.build(),
Example.class)
.filter(event -> event.getOperationType() != null)
.doOnNext(event -> lastResumeToken = event.getResumeToken()) // 关键:持续更新 token
.mapNotNull(ChangeStreamEvent::getBody)
.subscribe(
example -> exampleService.doSomething(example),
error -> log.error("Change stream error", error),
() -> log.info("Change stream completed")
);
}
public void stopWatching() {
if (currentSubscription != null && !currentSubscription.isDisposed()) {
currentSubscription.dispose();
log.info("Change stream stopped. Last resume token: {}", lastResumeToken);
}
}? 恢复流:从上次 Token 续订
MongoDB 要求恢复时传入 resumeAfter(非 startAfter),且该 token 必须来自同一流上下文(即同一集合、相同聚合管道)。恢复代码示例如下:
public void resumeWatching() {
if (lastResumeToken == null) {
log.warn("No valid resume token available; starting from latest");
startWatching(); // 退化为新流
return;
}
// 构建带 resumeAfter 的选项
ChangeStreamOptions options = ChangeStreamOptions.builder()
.returnFullDocumentOnUpdate()
.resumeAfter(lastResumeToken) // ⚠️ 核心参数
.build();
currentSubscription = reactiveMongoTemplate
.changeStream("collection", options, Example.class)
.filter(event -> event.getOperationType() != null)
.doOnNext(event -> lastResumeToken = event.getResumeToken())
.mapNotNull(ChangeStreamEvent::getBody)
.subscribe(
example -> exampleService.doSomething(example),
error -> {
log.error("Resume failed, falling back to new stream", error);
// 可选:自动降级为新流(如 token 过期)
startWatching();
}
);
}⚠️ 关键注意事项
- Token 时效性:MongoDB 默认保留变更流 token 最多 5 分钟(可通过 maxAwaitTimeMS 和副本集 oplog 大小间接影响),超时后 resumeAfter 将抛出 MongoCommandException(code=234)。生产环境建议捕获该异常并优雅降级。
-
线程安全:lastResumeToken 需用 volatile 修饰,且读写应加锁或使用 AtomicReference
避免竞态。 - 不支持 pause() / resume() 方法:Reactor 的 Flux 本身无内置暂停语义;所谓“暂停”本质是取消+重建,务必依赖服务端 Resume Token 实现语义连续性。
- 避免重复消费:resumeAfter 是严格大于指定 token 的首个事件,因此不会重复投递已处理事件,满足 exactly-once 语义前提(需业务层配合幂等设计)。
✅ 总结
变更流的“暂停-恢复”不是客户端流控,而是服务端游标生命周期管理。核心路径为:
① 订阅时持续提取 event.getResumeToken() 并持久化(如内存缓存或 Redis);
② 停止时调用 Disposable.dispose();
③ 恢复时构造 ChangeStreamOptions.resumeAfter(token) 重建流。
只要 token 有效,即可实现毫秒级断点续传,完美支撑运维灰度与弹性扩缩容场景。










