
Python 异步队列(asyncio.Queue)是构建高并发、低延迟异步任务调度系统的核心组件,特别适合生产者-消费者模型,比如实时日志收集、消息中转、API 请求批处理等场景。它不是线程安全的“队列”,而是专为协程设计的 awaitable 队列,必须在 asyncio 事件循环中使用。
asyncio.Queue 的基本用法与关键特性
asyncio.Queue 提供了协程友好的 put() 和 get() 方法,调用时自动挂起当前协程,直到操作可安全执行。它默认有容量限制(可设为 0 表示无界),支持 full()、empty()、qsize() 等状态查询,但要注意这些只是快照,不保证原子性。
- 所有操作必须在协程内 await,直接调用会返回协程对象而非结果
- 多个协程可安全地并发
put或get,无需额外加锁 - 支持
join()+task_done()实现任务完成同步,类似线程版queue.Queue
典型生产者-消费者模式实现
一个常见结构是:多个生产者协程持续生成数据并 put 到队列;多个消费者协程从队列 get 并处理。关键在于优雅终止——不能靠队列为空判断,而应使用哨兵值或 asyncio.Event 控制生命周期。
- 推荐用
None或自定义哨兵对象(如sentinel = object())标记结束 - 每个消费者收到哨兵后
break,并确保所有生产者结束后才关闭消费者 - 用
asyncio.gather()并发启动生产者和消费者,避免阻塞等待
实战技巧:限流、超时与错误隔离
真实业务中需考虑稳定性。例如限制每秒最多处理 N 条消息,或防止某条异常任务阻塞整个消费者。
立即学习“Python免费学习笔记(深入)”;
- 用
asyncio.sleep(1/N)在消费者内部做简单匀速消费;更精确可用aiolimiter库 -
get()支持timeout参数(如await queue.get(timeout=5)),超时抛asyncio.TimeoutError - 消费者内务必
try/except包裹处理逻辑,避免单个异常导致协程退出,影响整体吞吐
与第三方库协同:aiohttp + asyncio.Queue 构建异步爬虫管道
将队列作为请求分发与响应处理的缓冲层,能有效解耦网络 I/O 和解析逻辑。
- 生产者协程读取 URL 列表,
put到队列;可加去重、延迟、优先级(用asyncio.PriorityQueue) - 消费者协程用
aiohttp.ClientSession发起请求,拿到响应后解析并写入数据库或另一队列 - 注意控制并发请求数:用
asyncio.Semaphore包裹session.get(),防止被目标封禁










