高并发写入应采用队列削峰:内存级用 asyncio.queue(适合qps几百至一两千),分布式推荐 redis list + worker 进程(支持水平扩展);需搭配异步数据库驱动、连接池、批量写入及入口限流,辅以监控告警。

Python 处理高并发写入时,直接落库或写文件容易导致数据库连接打满、磁盘 IO 阻塞、响应超时甚至服务崩溃。用队列削峰是成熟且实用的方案——把瞬时大量写请求暂存到缓冲队列,再由后台消费者以可控速率消费处理。
用 asyncio.Queue 做内存级削峰(适合中低流量)
适用于单进程内、QPS 在几百到一两千的场景,轻量、无外部依赖。
- 生产者(如 FastAPI/Flask 接口)将写任务(如字典、JSON)非阻塞地 put_nowait() 到 asyncio.Queue
- 启动 1~3 个后台 asyncio task 持续 get() 并批量写入数据库(例如每 100 条 or 每 100ms 批量 commit)
- 设置 queue 的 maxsize(如 500),满时可快速失败(返回 429)或丢弃旧任务,避免 OOM
用 Redis List + worker 进程做分布式削峰(推荐生产环境)
跨进程、跨机器、支持水平扩展,适合真实高并发(QPS 上万+)。
- Web 请求通过 redis.rpush() 将数据推入 list(如 write_queue),操作毫秒级完成
- 独立的 worker 进程(可用 Python + redis.blpop 或 Celery)持续监听队列,一次 pop 多条(如 10~50 条)批量处理
- worker 写 DB 前可做去重、校验、聚合;失败任务可 requeue 或进死信队列(dlq:write_queue)人工干预
搭配异步数据库驱动提升吞吐(关键细节)
光有队列不够,下游写入也得跟上节奏:
立即学习“Python免费学习笔记(深入)”;
- 用 asyncpg(PostgreSQL)或 aiomysql 替代同步驱动,避免 worker 被 DB I/O 阻塞
- 开启连接池(如 asyncpg.create_pool(min_size=5, max_size=20)),复用连接,减少 handshake 开销
- 批量写优先用 executemany() 或原生 COPY 协议(PostgreSQL 的 copy_from),比逐条 insert 快 10x+
加一层简单限流保底(防雪崩)
队列只是缓冲,不是无限兜底。建议在入口加轻量限流:
- 用 redis.incr() + expire 实现滑动窗口计数(如 1 秒最多 500 次写请求)
- 超限时直接返回 429 Too Many Requests,不进队列,保护后端资源
- 监控队列长度(llen write_queue)、worker 消费延迟、失败率,触发告警(如队列 > 1w 条持续 30s)










