不能直接用find()拿全部数据再bulkwrite(),因toarray()会将全部文档加载到内存导致爆内存;应使用游标分批流式处理,按_id范围续传、每批limit+gt查询、bulkwrite()设ordered:false并捕获writeerrors。

为什么不能直接用 find() 拿全部数据再 bulkWrite()
内存会爆。MongoDB 的 find() 默认返回游标(cursor),但如果你调 .toArray() 或遍历到内存里攒一堆文档,100 万条文档轻松吃掉几 GB 内存——尤其文档带二进制字段或嵌套深时。Node.js 单进程扛不住,Python 的 list(cursor) 同理。
正确做法是让游标“流式吐出”,每批拉固定数量(比如 1000 条),处理完立刻 bulkWrite(),不囤积。
- 用
cursor.batchSize(n)只控制网络批次,不保证每次next()拿到 n 条;真正可控的是cursor.limit()+ 循环 +skip()(不推荐)或更稳妥的find({ _id: { $gt: lastId } })游标续传 - Node.js 驱动 v6+ 推荐用
cursor.tryNext()或for await;PyMongo 用batch_size参数 + 手动分页逻辑 - 别依赖
cursor.count()做总进度——集合大时它本身就很慢,还可能不准(有写入并发)
怎么用 find() 游标 + bulkWrite() 实现稳定分批
核心是“查一批、写一批、记位置、再查下一批”,关键在如何安全标记“下一批从哪开始”。用 _id 范围分片最稳,尤其 ObjectId 是时间戳前缀,天然有序。
- 首次查询:
db.collection.find({}).sort({ _id: 1 }).limit(1000) - 拿到这批最后一条的
_id(比如lastDoc._id),下次查{ _id: { $gt: lastDoc._id } } -
bulkWrite()时加{ ordered: false },避免单条失败中断整批;失败项用writeErrors字段单独捕获,别直接抛异常 - 每批执行后加短延时(如
await new Promise(r => setTimeout(r, 10))),减轻源库和目标库压力,也防驱动报CursorNotFound
Python PyMongo 和 Node.js MongoDB Driver 的关键参数差异
同一逻辑,两边驱动行为不同,容易踩坑。
- PyMongo:
cursor.batch_size(1000)影响每次从服务器拉多少文档到本地缓冲区,但for doc in cursor:还是逐条迭代;真要分批得手动切片或用itertools.islice(cursor, batch_size) - Node.js:
cursor.limit(1000)真限制结果数;cursor.toArray()仍会加载整批进内存,必须用for await (const doc of cursor)才流式;v6+ 默认启用useUnifiedTopology: true,否则游标超时易断 - 两者都需关掉
noCursorTimeout: true(Node)或设cursor.no_cursor_timeout = True(PyMongo),否则空闲 10 分钟游标自动销毁
bulkWrite() 失败时怎么不丢数据、也不卡死
批量写失败很常见:唯一键冲突、字段类型不符、磁盘满、网络抖动。硬抛错会导致迁移中断,全量重跑成本高。
- 始终开启
{ ordered: false }(Node)或ordered=False(PyMongo),让成功项先落地 - 检查
result.writeErrors,对每个writeErrors[i].errmsg做分类:若含"duplicate key",说明目标库已有该数据,可跳过;若含"E11000",同理 - 把失败文档单独写入临时集合(如
failed_migrate_batch_20240520),后续人工核对或补漏,别混在主流程里重试——重试可能无限循环 - 每批
bulkWrite()后校验result.upsertedCount + result.modifiedCount + result.deletedCount是否等于输入条数,不等就说明有静默失败(比如权限不足导致插入被忽略)
最麻烦的不是写法,是游标生命周期和网络波动之间的博弈。哪怕逻辑写对了,没设好超时、没处理好游标失效、没给 bulkWrite() 加重试退避,跑一晚上还是可能在第 87 批崩掉。










