Flask-SQLAlchemy 3.0+ 已移除 _get_debug_queries,应改用 SQLAlchemy 事件钩子(如 before_cursor_execute)结合 flask.g 实现请求级查询监控,推荐使用 flask-sqlalchemy-logger 等成熟工具。

Flask-SQLAlchemy 的 _get_debug_queries 已被移除,别再找它了
Flask-SQLAlchemy 3.0+ 版本彻底删掉了 _get_debug_queries 这个内部方法。你搜到的旧教程、Stack Overflow 答案或 GitHub Gist 里还在用它,运行时会直接报 AttributeError: 'SQLAlchemy' object has no attribute '_get_debug_queries'。这不是你配置错,是它真没了。
替代方案不是“换个写法调用它”,而是换一套机制——靠 Flask 的请求上下文 + SQLAlchemy 的事件钩子来捕获查询。
用 before_request + after_request 拦截查询耗时
核心思路:在请求开始时记录当前时间,在结束时遍历 SQLAlchemy 的查询日志(需开启 echo=True 或手动注册事件),筛选出本次请求内的慢查询。不依赖已删除的私有 API。
实操建议:
- 确保
SQLALCHEMY_ECHO=False(避免日志刷屏),改用sqlalchemy.event.listen监听engine_connect和before_cursor_execute - 用
flask.g存储请求级的查询列表和起始时间,避免多请求交叉污染 - 只对开发环境启用,生产环境用
slow_query_log配合数据库原生慢日志更稳妥 - 注意:
before_cursor_execute拿到的是原始 SQL 字符串和参数,要自己算执行耗时,不能直接读duration
示例片段(非完整可运行代码,仅示意关键链路):
from flask import g, request
from sqlalchemy import event
import time
<p>@event.listens_for(db.engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
g.setdefault("queries", [])
g.queries.append({
"statement": statement,
"start_time": time.time()
})</p><p>@event.listens_for(db.engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
if g.queries:
last = g.queries[-1]
last["duration"] = time.time() - last["start_time"]
last["parameters"] = parameters</p>查慢查询别只盯 Python 层,先看数据库有没有真正执行慢
很多所谓“Flask-SQLAlchemy 慢”,其实是 SQL 本身没走索引、N+1 查询、或返回了几万行数据在 Python 里做排序。Python 层看到的耗时,可能 90% 是数据库在等磁盘 I/O。
判断依据:
- 用
EXPLAIN ANALYZE(PostgreSQL)或EXPLAIN FORMAT=JSON(MySQL)跑对应 SQL,看实际执行计划 - 检查是否触发了全表扫描(
Seq Scan/type: ALL)、临时表(Using temporary)、文件排序(Using filesort) - 确认
query.get()和query.filter().first()的区别:前者走主键索引快,后者可能扫全表 - 警惕
relationship的 lazy 加载模式,默认'select'就是 N+1 元凶,改成'joined'或显式joinedload()
本地调试推荐用 flask-sqlalchemy-logger 而不是手写钩子
重复造轮子容易漏掉事务边界、异步请求、连接池复用等细节。有个轻量小包 flask-sqlalchemy-logger 就是干这事的:自动按请求聚合查询、标出 >100ms 的慢查询、支持输出到控制台或文件。
安装和启用很简单:
pip install flask-sqlalchemy-logger # 在 Flask app 初始化后加 from flask_sqlalchemy_logger import SQLAlchemyLogger SQLAlchemyLogger(app, threshold=100) # 单位毫秒
它底层用的正是 before_cursor_execute + g,但处理了参数脱敏、SQL 截断、并发安全等真实场景问题。你自己写的 20 行钩子,很可能在并发压测时丢掉部分查询记录。
真正难的从来不是“怎么拿到 SQL”,而是“怎么确定这条 SQL 为什么慢”——这得看执行计划、看数据分布、看锁等待,不是 Python 日志能告诉你的。











