asyncio.semaphore 是控制异步任务并发数最直接、轻量且可靠的选择,需用 async with 确保自动释放,避免手动 acquire/release;其他方案如 threadpoolexecutor、asyncio.wait() 或 aiojobs 各有适用场景,不可滥用。

asyncio.Semaphore 控制并发数量最直接
想限制同时跑的异步任务数,asyncio.Semaphore 是标准库里最轻量、最可控的选择。它不改写协程逻辑,只在关键入口加一层“闸门”,适合绝大多数 IO 密集型场景(比如并发调用 HTTP 接口、数据库查询)。
常见错误是把 semaphore.acquire() 写成同步调用,或者忘了用 async with 自动释放——漏掉释放会导致后续所有任务永久阻塞。
- 必须用
async with semaphore:包裹临界区,不能手动await semaphore.acquire()+semaphore.release()(容易出错) - 初始化时传入的整数就是最大并发数,比如
asyncio.Semaphore(5)表示最多 5 个任务同时执行 - 如果任务本身可能抛异常,
async with仍能保证释放,比手写 try/finally 更可靠
import asyncio
<p>sem = asyncio.Semaphore(3)</p><p>async def fetch(url):
async with sem: # 这里会自动 acquire/release
await asyncio.sleep(1) # 模拟网络请求
return f"done {url}"</p><p>tasks = [fetch(f"<a href="https://www.php.cn/link/83736f851256ebfd392d677baefc775d">https://www.php.cn/link/83736f851256ebfd392d677baefc775d</a>}") for i in range(10)]
results = await asyncio.gather(*tasks)</p>concurrent.futures.ThreadPoolExecutor 不适合纯 async 场景
有人看到“并发控制”就下意识套用 ThreadPoolExecutor 的 max_workers,但它本质是为同步函数设计的。强行把 await 协程丢进去,会触发 RuntimeWarning: coroutine 'xxx' was never awaited,甚至静默失败。
立即学习“Python免费学习笔记(深入)”;
只有当你明确需要混用同步阻塞代码(比如旧版 requests、PIL 图片处理)且无法改造成 async 版本时,才考虑它——但此时你已不在纯异步路径上,控制目标也变了。
客客出品专业威客系统英文名称KPPW,也是keke produced professional witkey的缩写。KPPW是一款基于PHP+MYSQL技术构架的威客系统,积客客团队多年实践和对威客模式商业化运作的大量调查分析而精心策划研发,是您轻松搭建威客网站的首选利器。KPPW针对威客任务和商品交易模式进行了细致的分析,提供完善威客任务流程控制解决方案,并将逐步分享威客系统专业化应用作为我们的
-
asyncio.to_thread()(Python 3.9+)可安全桥接同步函数,配合ThreadPoolExecutor使用,但并发数仍得靠外层Semaphore或asyncio.wait(..., return_when=asyncio.FIRST_COMPLETED)控制 - 不要对
async def函数用loop.run_in_executor(),它不会等待协程完成,只是提交一个未 await 的对象 - 如果你的任务全是
httpx.AsyncClient或aiomysql这类原生 async 库,绕过线程池更干净
asyncio.wait() + limit 参数容易误解
asyncio.wait() 本身没有内置并发数限制参数。网上有些示例用 return_when=asyncio.FIRST_COMPLETED 配合循环调度,看似能控流,但实际逻辑复杂、易漏错误处理,还可能因任务取消导致 wait() 返回空集合而卡死。
它更适合“等一批里的任意一个完成”,而不是“始终保持 N 个运行中”。真要手写调度器,不如直接用 asyncio.Semaphore 简洁。
- 别写
await asyncio.wait(chunk, return_when=asyncio.FIRST_COMPLETED)来模拟限流——chunk 大小 ≠ 并发数,因为任务完成时间不确定 - 如果必须动态调整并发数(比如根据响应延迟升降级),
Semaphore的_value是私有属性,不能直接改;应重建新实例或封装一层带 setter 的 wrapper -
asyncio.as_completed()返回的是完成顺序,也不自带限流,仍需配合Semaphore使用
第三方库如 aiojobs 适合长期后台任务管理
如果你的任务是常驻服务里的后台作业(比如定时清理、消息轮询),aiojobs 提供了 Scheduler 和自动错误日志、优雅关闭等能力,比裸写 Semaphore 更省心。但它不是为一次性批量任务设计的。
典型误用是拿它替代 gather + Semaphore 做短平快的并发请求,反而引入不必要的生命周期管理开销。
- 安装后用
await scheduler.spawn(coro)提交任务,scheduler.limit控制并发上限 - 进程退出前必须
await scheduler.close(),否则可能丢失未完成任务 - 错误默认只 log 不抛出,调试时容易忽略失败任务,建议显式加
try/except或监听scheduler.error
真正难的不是选哪个 API,而是判断“并发数”到底该设多少:它取决于下游服务的承受力、本地事件循环的调度开销、以及任务本身的 IO 耗时波动。设太高可能触发对方限流或本地内存暴涨,设太低又浪费资源——这个阈值得实测,不能只看文档推荐值。









