Change Streams 在 Go 中需 MongoDB 4.0+ 副本集,驱动依赖服务端 oplog;须正确配置 replicaSet、授权 read/changeStream 权限、妥善序列化 ResumeToken,并将过滤逻辑下沉至聚合管道。

Change Streams 在 Go 中必须用 MongoDB 4.0+ 且副本集
Go 驱动的 ChangeStream 不是“开箱即用”的功能,它依赖服务端能力:MongoDB 必须是 4.0 或更高版本,并运行在副本集(Replica Set)模式下——单机 mongod 启动或非副本集分片集群直连会直接报错 change streams are only supported on replica sets。
常见踩坑点:
- 本地开发用
docker run mongo:6.0默认启单节点,没配--replSet和初始化,Collection.Watch()会静默失败或返回空游标 - 连接字符串里没加
?replicaSet=rs0,驱动无法识别副本集拓扑,后续监听直接卡住或超时 - 使用 Atlas 免费层时,默认是副本集,但需确认集群状态页显示 “Replica Set”,而非 “Serverless Instance”(后者不支持 Change Streams)
Watch() 调用前必须确保集合有写操作权限和 oplog 可读
Watch() 底层依赖 MongoDB 的 oplog,所以执行用户必须拥有 read 权限(或更细粒度的 changeStream 权限),且数据库用户不能是 admin 数据库下的只读账号——很多团队用 root 创建用户但忘了给目标库授权。
实操建议:
立即学习“go语言免费学习笔记(深入)”;
- 创建用户时显式指定
db.createUser({user:"app", pwd:"x", roles:[{role:"read", db:"mydb"}]}),别依赖 inherited roles - 如果用
mongosh测试变更流是否生效,先手动插入一条:db.users.insertOne({name:"test", ts:new Date()}),再看 Go 程序能否收到insert类型事件 - 避免在
Watch()前做耗时初始化(如加载配置、连 Redis),否则可能错过第一条变更;建议把Watch()放在 goroutine 里尽早启动
处理 ResumeToken 失败会导致重复或丢失事件
Change Stream 不是“永远在线”的长连接,网络抖动、主从切换、客户端重启都会中断。靠 ResumeToken 恢复位置,但它的类型是 primitive.D(BSON 文档),不能直接 JSON 序列化或存字符串——很多人误存成 token.String(),下次传回时解析失败,驱动退回到全量重播或直接报错 invalid resume token。
正确做法:
- 用
token.MarshalBSON()存二进制,或转成 base64 字符串(base64.StdEncoding.EncodeToString(b)) - 恢复时用
primitive.UnmarshalBSON(data, &token)反序列化,不是json.Unmarshal - 首次启动无 token 时传
nil,不要传空primitive.D{},否则可能触发服务端异常 - 每次成功处理完一个事件后,立刻更新本地存储的 token,而不是等 batch 结束——否则崩溃就丢最后几条
聚合管道过滤要写在 Watch() 的第一个参数,不能靠 Go 层 if 判断
Change Stream 事件默认推送所有操作(insert/update/delete/replace),如果在 Go 循环里用 if event.OperationType == "update" 过滤,等于把全部变更都拉到内存再筛——浪费带宽、增加延迟、还可能因 OOM 导致游标关闭。
应该把条件下沉到服务端:
- 用
mongo.Pipeline{{{"$match", bson.D{{"operationType", "update"}}}}作为Watch()第一个参数 - 想监听某字段变更?写
{"$match", bson.D{{"updateDescription.updatedFields.fieldA", bson.M{"$exists": true}}}} - 注意:管道里不能用
$lookup或其它需要读取其他集合的操作,Change Stream pipeline 仅支持$match、$addFields、$project等有限阶段 - 测试管道语法是否合法,可先在
mongosh里跑db.collection.watch([{$match:{...}}])看是否报错
Change Stream 的稳定性高度依赖副本集心跳、oplog 截断窗口和客户端 token 管理精度。哪怕只是多存一个字段、少 decode 一次 BSON,都可能让监听在高并发下漏掉关键事件。










