0

0

MongoDB 变更流(Change Stream)的暂停与恢复实战指南

碧海醫心

碧海醫心

发布时间:2026-02-26 16:04:02

|

228人浏览过

|

来源于php中文网

原创

MongoDB 变更流(Change Stream)的暂停与恢复实战指南

本文详解如何在 spring reactive mongodb 应用中动态停止并安全恢复变更流,利用 disposable 控制订阅生命周期,并结合 resume token 实现断点续传,适用于数据库维护等场景。

本文详解如何在 spring reactive mongodb 应用中动态停止并安全恢复变更流,利用 disposable 控制订阅生命周期,并结合 resume token 实现断点续传,适用于数据库维护等场景。

在响应式 MongoDB 开发中,变更流(Change Stream)是监听集合数据实时变更的核心机制。但生产环境中常需临时中断流(例如执行索引重建、批量迁移或备份),之后从中断位置精准续订,而非丢失事件或重放全量历史。Spring Data MongoDB 的 ReactiveMongoTemplate.changeStream(...) 返回的是 Flux,其本质是冷流(cold stream),每次订阅都会新建一次服务端游标——因此“暂停”不能靠阻塞线程实现,而必须通过取消订阅 + 保存/复用 Resume Token 完成。

✅ 正确做法:取消订阅 + 基于 Token 恢复

Flux.subscribe() 返回 Disposable,调用 .dispose() 即可立即终止当前流订阅,释放客户端资源(注意:它不会自动通知 MongoDB 服务端关闭游标,但后续无心跳将由服务端超时清理,符合预期):

Runway
Runway

Runway是一个AI创意工具平台,它提供了一系列强大的功能,旨在帮助用户在视觉内容创作、设计和开发过程中提高效率和创新能力。

下载
// 启动变更流并持有 Disposable 引用
private volatile Disposable currentSubscription;
private volatile BsonValue lastResumeToken;

public void startWatching() {
    currentSubscription = reactiveMongoTemplate
        .changeStream("collection", 
            ChangeStreamOptions.builder()
                .returnFullDocumentOnUpdate()
                .build(), 
            Example.class)
        .filter(event -> event.getOperationType() != null)
        .doOnNext(event -> lastResumeToken = event.getResumeToken()) // 关键:持续更新 token
        .mapNotNull(ChangeStreamEvent::getBody)
        .subscribe(
            example -> exampleService.doSomething(example),
            error -> log.error("Change stream error", error),
            () -> log.info("Change stream completed")
        );
}

public void stopWatching() {
    if (currentSubscription != null && !currentSubscription.isDisposed()) {
        currentSubscription.dispose();
        log.info("Change stream stopped. Last resume token: {}", lastResumeToken);
    }
}

? 恢复流:从上次 Token 续订

MongoDB 要求恢复时传入 resumeAfter(非 startAfter),且该 token 必须来自同一流上下文(即同一集合、相同聚合管道)。恢复代码示例如下:

public void resumeWatching() {
    if (lastResumeToken == null) {
        log.warn("No valid resume token available; starting from latest");
        startWatching(); // 退化为新流
        return;
    }

    // 构建带 resumeAfter 的选项
    ChangeStreamOptions options = ChangeStreamOptions.builder()
        .returnFullDocumentOnUpdate()
        .resumeAfter(lastResumeToken) // ⚠️ 核心参数
        .build();

    currentSubscription = reactiveMongoTemplate
        .changeStream("collection", options, Example.class)
        .filter(event -> event.getOperationType() != null)
        .doOnNext(event -> lastResumeToken = event.getResumeToken())
        .mapNotNull(ChangeStreamEvent::getBody)
        .subscribe(
            example -> exampleService.doSomething(example),
            error -> {
                log.error("Resume failed, falling back to new stream", error);
                // 可选:自动降级为新流(如 token 过期)
                startWatching();
            }
        );
}

⚠️ 关键注意事项

  • Token 时效性:MongoDB 默认保留变更流 token 最多 5 分钟(可通过 maxAwaitTimeMS 和副本集 oplog 大小间接影响),超时后 resumeAfter 将抛出 MongoCommandException(code=234)。生产环境建议捕获该异常并优雅降级。
  • 线程安全:lastResumeToken 需用 volatile 修饰,且读写应加锁或使用 AtomicReference 避免竞态。
  • 不支持 pause() / resume() 方法:Reactor 的 Flux 本身无内置暂停语义;所谓“暂停”本质是取消+重建,务必依赖服务端 Resume Token 实现语义连续性。
  • 避免重复消费:resumeAfter 是严格大于指定 token 的首个事件,因此不会重复投递已处理事件,满足 exactly-once 语义前提(需业务层配合幂等设计)。

✅ 总结

变更流的“暂停-恢复”不是客户端流控,而是服务端游标生命周期管理。核心路径为:
① 订阅时持续提取 event.getResumeToken() 并持久化(如内存缓存或 Redis);
② 停止时调用 Disposable.dispose();
③ 恢复时构造 ChangeStreamOptions.resumeAfter(token) 重建流。
只要 token 有效,即可实现毫秒级断点续传,完美支撑运维灰度与弹性扩缩容场景。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
spring框架介绍
spring框架介绍

本专题整合了spring框架相关内容,想了解更多详细内容,请阅读专题下面的文章。

144

2025.08.06

Java Spring Security 与认证授权
Java Spring Security 与认证授权

本专题系统讲解 Java Spring Security 框架在认证与授权中的应用,涵盖用户身份验证、权限控制、JWT与OAuth2实现、跨站请求伪造(CSRF)防护、会话管理与安全漏洞防范。通过实际项目案例,帮助学习者掌握如何 使用 Spring Security 实现高安全性认证与授权机制,提升 Web 应用的安全性与用户数据保护。

82

2026.01.26

登录token无效
登录token无效

登录token无效解决方法:1、检查token的有效期限,如果token已经过期,需要重新获取一个新的token;2、检查token的签名,如果签名不正确,需要重新获取一个新的token;3、检查密钥的正确性,如果密钥不正确,需要重新获取一个新的token;4、使用HTTPS协议传输token,建议使用HTTPS协议进行传输 ;5、使用双因素认证,双因素认证可以提高账户的安全性。

6461

2023.09.14

登录token无效怎么办
登录token无效怎么办

登录token无效的解决办法有检查Token是否过期、检查Token是否正确、检查Token是否被篡改、检查Token是否与用户匹配、清除缓存或Cookie、检查网络连接和服务器状态、重新登录或请求新的Token、联系技术支持或开发人员等。本专题为大家提供token相关的文章、下载、课程内容,供大家免费下载体验。

838

2023.09.14

token怎么获取
token怎么获取

获取token值的方法:1、小程序调用“wx.login()”获取 临时登录凭证code,并回传到开发者服务器;2、开发者服务器以code换取,用户唯一标识openid和会话密钥“session_key”。想了解更详细的内容,可以阅读本专题下面的文章。

1087

2023.12.21

token什么意思
token什么意思

token是一种用于表示用户权限、记录交易信息、支付虚拟货币的数字货币。可以用来在特定的网络上进行交易,用来购买或出售特定的虚拟货币,也可以用来支付特定的服务费用。想了解更多token什么意思的相关内容可以访问本专题下面的文章。

1723

2024.03.01

c++中volatile关键字的作用
c++中volatile关键字的作用

本专题整合了c++中volatile关键字的相关内容,阅读专题下面的文章了解更多详细内容。

71

2025.10.23

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

721

2023.08.10

Golang 实际项目案例:从需求到上线
Golang 实际项目案例:从需求到上线

《Golang 实际项目案例:从需求到上线》以真实业务场景为主线,完整覆盖需求分析、架构设计、模块拆分、编码实现、性能优化与部署上线全过程,强调工程规范与实践决策,帮助开发者打通从技术实现到系统交付的关键路径,提升独立完成 Go 项目的综合能力。

1

2026.02.26

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Go 教程
Go 教程

共32课时 | 5.6万人学习

Go语言实战之 GraphQL
Go语言实战之 GraphQL

共10课时 | 0.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号