0

0

MongoDB Change Stream:如何暂停与恢复变更流

聖光之護

聖光之護

发布时间:2026-02-26 20:03:11

|

722人浏览过

|

来源于php中文网

原创

MongoDB Change Stream:如何暂停与恢复变更流

本文详解如何在 Spring Data MongoDB Reactive 中动态暂停和恢复 Change Stream,利用 Disposable 控制订阅生命周期,并结合 resume token 实现断点续传,适用于数据库维护等场景。

本文详解如何在 spring data mongodb reactive 中动态暂停和恢复 change stream,利用 `disposable` 控制订阅生命周期,并结合 resume token 实现断点续传,适用于数据库维护等场景。

在响应式应用中,MongoDB 的 Change Stream 是实现实时数据同步的核心机制。但实际运维中(如集合重建、索引优化或批量迁移),常需临时中止监听、执行维护操作、再从断点精准恢复——而非简单重启流导致事件丢失或重复。Spring Data MongoDB Reactive 提供了基于 Project Reactor 的 Flux 支持,其生命周期完全由 Disposable 管理,这正是实现“可控暂停/恢复”的关键。

✅ 暂停 Change Stream:取消订阅即可

Change Stream 本质是一个冷流(cold Flux),其执行依赖于下游订阅。调用 dispose() 即可立即终止当前订阅,释放资源并停止事件推送:

// 启动监听并持有 Disposable 引用
private volatile Disposable currentSubscription;

public void startWatching() {
    if (currentSubscription == null || currentSubscription.isDisposed()) {
        currentSubscription = reactiveMongoTemplate
            .changeStream("collection", 
                ChangeStreamOptions.builder()
                    .returnFullDocumentOnUpdate()
                    .build(), 
                Example.class)
            .filter(event -> event.getOperationType() != null)
            .mapNotNull(ChangeStreamEvent::getBody)
            .subscribe(
                example -> exampleService.doSomething(example),
                error -> log.error("Change stream error", error),
                () -> log.info("Change stream completed")
            );
    }
}

public void stopWatching() {
    if (currentSubscription != null && !currentSubscription.isDisposed()) {
        currentSubscription.dispose(); // ✅ 立即停止流,无副作用
        currentSubscription = null;
    }
}

⚠️ 注意:dispose() 是非阻塞、即时生效的操作,不会等待正在处理的事件完成,因此需确保 doSomething() 方法具备幂等性或事务一致性。

Replit Agent
Replit Agent

Replit最新推出的AI编程工具,可以帮助用户从零开始自动构建应用程序。

下载

? 恢复 Change Stream:利用 Resume Token 实现断点续传

单纯重启流会从最新时间点开始,丢失暂停期间的变更。要真正“从断点恢复”,必须在暂停前保存 resumeToken,并在恢复时传入:

private volatile Bson resumeToken;

// 修改 watch() 方法,捕获并缓存 resumeToken
public Flux<ChangeStreamEvent<Example>> watchWithResumeSupport() {
    return reactiveMongoTemplate.changeStream(
        "collection",
        ChangeStreamOptions.builder()
            .returnFullDocumentOnUpdate()
            .resumeAfter(resumeToken) // ? 关键:恢复时指定 token
            .build(),
        Example.class
    ).doOnNext(event -> {
        // 持久化最新 resumeToken(建议存入 Redis 或本地内存,避免单点故障)
        resumeToken = event.getResumeToken();
    });
}

// 恢复监听(调用前确保 resumeToken 已设置)
public void resumeWatching() {
    if (resumeToken == null) {
        log.warn("No resume token available; starting from latest");
    }
    currentSubscription = watchWithResumeSupport()
        .filter(event -> event.getOperationType() != null)
        .mapNotNull(ChangeStreamEvent::getBody)
        .subscribe(example -> exampleService.doSomething(example));
}

? 最佳实践与注意事项

  • Token 存储可靠性:resumeToken 应定期持久化(如每 5 秒写入 Redis),避免进程崩溃导致 token 丢失;生产环境推荐使用带 TTL 的分布式存储。
  • 空 token 处理:首次启动或 token 无效时,resumeAfter(null) 等价于从最新时间开始,符合预期。
  • 错误重试策略:网络中断等异常应触发自动重连 + token 恢复,可结合 retryWhen() 封装健壮流:
    .retryWhen(Retry.backoff(3, Duration.ofSeconds(2))
        .filter(throwable -> throwable instanceof MongoSocketReadException))
  • 资源清理:dispose() 后务必置空引用,防止内存泄漏;建议配合 @PreDestroy 在 Bean 销毁时兜底清理。

通过 Disposable 精确控制生命周期 + resumeAfter() 保障语义连续性,你可以在不牺牲数据一致性的前提下,灵活调度 Change Stream,真正将变更监听纳入可控运维体系。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
spring框架介绍
spring框架介绍

本专题整合了spring框架相关内容,想了解更多详细内容,请阅读专题下面的文章。

144

2025.08.06

Java Spring Security 与认证授权
Java Spring Security 与认证授权

本专题系统讲解 Java Spring Security 框架在认证与授权中的应用,涵盖用户身份验证、权限控制、JWT与OAuth2实现、跨站请求伪造(CSRF)防护、会话管理与安全漏洞防范。通过实际项目案例,帮助学习者掌握如何 使用 Spring Security 实现高安全性认证与授权机制,提升 Web 应用的安全性与用户数据保护。

82

2026.01.26

什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

400

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

248

2023.10.07

c语言中null和NULL的区别
c语言中null和NULL的区别

c语言中null和NULL的区别是:null是C语言中的一个宏定义,通常用来表示一个空指针,可以用于初始化指针变量,或者在条件语句中判断指针是否为空;NULL是C语言中的一个预定义常量,通常用来表示一个空值,用于表示一个空的指针、空的指针数组或者空的结构体指针。

248

2023.09.22

java中null的用法
java中null的用法

在Java中,null表示一个引用类型的变量不指向任何对象。可以将null赋值给任何引用类型的变量,包括类、接口、数组、字符串等。想了解更多null的相关内容,可以阅读本专题下面的文章。

906

2024.03.01

登录token无效
登录token无效

登录token无效解决方法:1、检查token的有效期限,如果token已经过期,需要重新获取一个新的token;2、检查token的签名,如果签名不正确,需要重新获取一个新的token;3、检查密钥的正确性,如果密钥不正确,需要重新获取一个新的token;4、使用HTTPS协议传输token,建议使用HTTPS协议进行传输 ;5、使用双因素认证,双因素认证可以提高账户的安全性。

6461

2023.09.14

登录token无效怎么办
登录token无效怎么办

登录token无效的解决办法有检查Token是否过期、检查Token是否正确、检查Token是否被篡改、检查Token是否与用户匹配、清除缓存或Cookie、检查网络连接和服务器状态、重新登录或请求新的Token、联系技术支持或开发人员等。本专题为大家提供token相关的文章、下载、课程内容,供大家免费下载体验。

838

2023.09.14

Golang 实际项目案例:从需求到上线
Golang 实际项目案例:从需求到上线

《Golang 实际项目案例:从需求到上线》以真实业务场景为主线,完整覆盖需求分析、架构设计、模块拆分、编码实现、性能优化与部署上线全过程,强调工程规范与实践决策,帮助开发者打通从技术实现到系统交付的关键路径,提升独立完成 Go 项目的综合能力。

1

2026.02.26

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Go 教程
Go 教程

共32课时 | 5.6万人学习

Go语言实战之 GraphQL
Go语言实战之 GraphQL

共10课时 | 0.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号