必须用 loop.run_in_executor() 当异步函数需调用无法修改的同步阻塞函数(如 requests.get、time.sleep),且该函数会等待IO或耗时较长,否则会阻塞事件循环;需 await 其返回的 Future,不可漏掉 await。

什么时候必须用 loop.run_in_executor()
当你的异步函数里不得不调用一个「没法改」的同步阻塞函数(比如 requests.get()、time.sleep()、json.loads() 里嵌了慢解析逻辑、或调第三方 SDK 的老接口),而你又不能让它卡住整个 event loop —— 这时候就得扔进线程池。不是所有同步代码都需要,但凡它会「等 IO」或「吃 CPU 时间长」,就危险。
常见错误现象:RuntimeWarning: coroutine 'xxx' was never awaited(误把同步函数当协程 await);或者整个服务响应变慢、并发数上不去,asyncio 像没开一样。
- 只对真正阻塞的调用用,别滥用——纯计算且快的函数(如简单字符串处理)没必要进线程池
- 别在 executor 里再调 asyncio 的东西(比如
await或loop.create_task()),线程里没有 event loop - 默认用
concurrent.futures.ThreadPoolExecutor,CPU 密集型才考虑ProcessPoolExecutor(但要注意进程间对象传递开销)
run_in_executor() 怎么写才不报错
核心是:它返回的是一个 Future,不是协程,所以得 await 它,而不是直接调用。很多人漏掉 await,结果得到一个 Future 对象,后续再 await 就出错。
正确姿势:
立即学习“Python免费学习笔记(深入)”;
import asyncio from concurrent.futures import ThreadPoolExecutor <p>def sync_heavy_work(x): time.sleep(2) # 模拟阻塞 return x * 2</p><p>async def main(): loop = asyncio.get_running_loop()</p><h1>✅ 正确:await 返回的 Future</h1><pre class='brush:python;toolbar:false;'>result = await loop.run_in_executor(None, sync_heavy_work, 10) print(result) # 20
-
None表示用默认线程池;传入自定义ThreadPoolExecutor实例可控制最大线程数 - 参数按顺序往后摆:
run_in_executor(executor, func, *args),不支持关键字参数传入,得包一层 - 如果
sync_heavy_work需要 keyword args,写个 lambda 或用functools.partial
线程池共享和生命周期怎么管
默认的 None 线程池由 asyncio 内部管理,启动时懒创建,关机时自动 shutdown。但如果你频繁创建/销毁 event loop(比如单元测试、CLI 工具反复跑),或者想复用固定大小的池子,就得自己管。
容易踩的坑:ThreadPoolExecutor 被 gc 掉后,还在跑的任务可能被中断;或者多个协程共用一个 executor 却没设好 max_workers,导致线程爆炸。
- 推荐在应用启动时创建一次 executor,挂到
loop上或作为全局变量,避免每次调用都新建 -
max_workers=5是常见起点,具体看阻塞函数耗时和并发量——别设太大,线程切换本身有成本 - 显式 shutdown:应用退出前调
executor.shutdown(wait=True),否则可能丢任务
比 run_in_executor() 更轻量的替代方案
不是所有“同步”都真需要线程池。比如只是短暂 sleep,用 asyncio.sleep();只是 JSON 解析,现代 json.loads() 在 CPython 下其实很快,除非数据超大;甚至有些库已有 async 版本(如 aiohttp 替代 requests)。
用错场景反而更慢:小数据 + 线程调度开销 > 同步执行时间。
- 先测:用
timeit或perf_counter看同步函数单次耗时,> 10ms 才值得进线程池 - 优先查有没有 async-native 替代品,比如数据库用
asyncpg、HTTP 用aiohttp、文件读写用aiopath - 如果只是为兼容旧代码,且调用频次低,有时加个
await asyncio.to_thread()(Python 3.9+)更简洁,它底层也用run_in_executor,但封装了参数传递
实际项目里最常被忽略的是 executor 的复用粒度和阻塞判定边界——有人把整个 for 循环包进去,有人把日志打印这种毫秒级操作也扔线程池,结果性能没提,debug 成本翻倍。










