滑动窗口限流不能只靠time.time()算时间差,因为需维护多个滑动时间桶而非单一起始时间;正确做法是用deque存(time_bucket, count)元组,每次请求先剔除过期桶再累加,并依部署模型选threading.lock或asyncio.lock,生产环境应优先用redis实现分布式原子限流。

滑动窗口限流为什么不能只靠 time.time() 算时间差
因为窗口是“滑动”的,不是固定切片;单纯用当前时间减去窗口起始时间,会漏掉跨多个小窗口的请求累积。比如 1 秒内限 10 次,但实际要支持每 100ms 滑动一次——你得存最近 10 个 100ms 的计数,而不是只记“这秒开始时间”。
- 典型错误:用一个全局
last_reset时间 + 单一计数器,导致窗口边界僵硬、突增流量被误放行 - 正确思路:维护一个有序的时间戳队列(或环形缓冲区),每次请求进来时,先剔除超时的旧记录,再累加新请求
- Python 中推荐用
collections.deque存(timestamp, count)元组,maxlen可设为窗口分片数,避免无限增长
用 deque 实现滑动窗口的核心逻辑怎么写
关键不在“加”,而在“删旧”——每次请求都得先清理过期桶,再决定是否允许通过。不清理就等于计数永远只增不减。
- 窗口总长设为
window_size_ms = 1000,分片粒度step_ms = 100,则最多存 10 个桶 - 每个桶是
(int(time.time() * 1000) // step_ms, count),用整数时间戳做 key,避免浮点误差 - 插入前遍历
deque左端,弹出所有timestamp 的项 - 示例片段:
bucket_id = int(time.time() * 1000) // 100<br>while dq and dq[0][0] < bucket_id - 10:<br> dq.popleft()<br>if len(dq) == 0 or dq[-1][0] != bucket_id:<br> dq.append([bucket_id, 0])<br>dq[-1][1] += 1<br>allowed = sum(cnt for _, cnt in dq) <= 10
threading.Lock 和 asyncio.Lock 怎么选
取决于你的服务模型。Web 框架如 Flask/FastAPI 默认同步,用 threading.Lock 就够;若用 uvicorn --workers 1 --loop uvloop 配合 async 路由,则必须用 asyncio.Lock,否则 await 会卡死。
- 常见错误:在
async函数里用threading.Lock的.acquire()—— 这不是协程,会阻塞整个 event loop - 性能影响:锁粒度越细越好,别把整个滑动窗口结构包在一个大锁里;可考虑对每个 bucket_id 做分段锁,但 Python GIL 下收益有限,通常单锁更稳
- 如果用 Redis 后端,反而不用本地锁,直接靠
INCR+EXPIRE组合实现原子滑动窗口(例如用redis-cell或自建 Lua 脚本)
为什么生产环境慎用纯内存滑动窗口
多进程部署时,每个 worker 有独立内存,限流状态不共享——用户连续请求打到不同进程,就等于绕过限制。
立即学习“Python免费学习笔记(深入)”;
- FastAPI + Uvicorn 多 worker 模式下,
deque限流完全失效,除非你上Redis或Memcached - 单进程 + 多线程安全,但扩容能力差;单进程 +
async安全,但扛不住突发连接数 - 真正落地时,90% 的场景该直接用
redis-py配合 Lua 脚本:用ZSET存时间戳+请求 ID,ZREMRANGEBYSCORE清旧,ZCARD计数,天然分布式、原子、无锁
滑动窗口看着简单,难点从来不在算法,而在状态一致性。本地内存只是开发验证用,上线前记得确认部署模型和存储边界。











