asyncio.queue是python中用于协程间安全传递数据的标准异步队列,支持生产者-消费者模型、并发限制、异常传递、超时控制及动态任务调度。

如果您在使用 Python 编写高并发异步任务时需要安全地在协程间传递数据,则 asyncio.Queue 是 asyncio 提供的标准异步队列实现。以下是其典型应用场景与使用方式:
一、基本生产者-消费者模型
asyncio.Queue 专为协程环境设计,内部自动处理 await 等待逻辑,避免竞态条件,适用于多个协程并发读写场景。它支持 await queue.put() 和 await queue.get(),且具备阻塞式等待能力。
1、创建一个容量为 5 的异步队列:queue = asyncio.Queue(maxsize=5)
2、定义异步生产者协程,在循环中调用 await queue.put(item) 插入数据
立即学习“Python免费学习笔记(深入)”;
3、定义异步消费者协程,通过 await queue.get() 获取数据,并在处理完毕后调用 queue.task_done()
4、使用 await queue.join() 阻塞等待所有已放入队列的任务被消费者处理完成
二、限制并发数量的异步任务调度
通过控制队列容量与消费者数量,可硬性限制同时运行的异步任务数,防止资源耗尽或触发限流。队列充当缓冲区与节流阀双重角色。
1、初始化一个 maxsize=3 的队列:semaphore_queue = asyncio.Queue(maxsize=3)
2、每个待执行的异步任务先 await semaphore_queue.put(None) 占位
3、执行实际业务逻辑后,立即调用 semaphore_queue.get_nowait() 并 semaphore_queue.task_done()
4、启动固定数量(如 3 个)消费者协程持续从该队列取值,确保任意时刻最多 3 个任务处于活跃状态
三、跨协程传递异常与终止信号
asyncio.Queue 可承载任意 Python 对象,包括 Exception 实例或自定义哨兵对象,用于协调协程生命周期与错误传播。
1、定义哨兵值:STOP_SIGNAL = object()
2、生产者在完成数据生成后,向队列放入 await queue.put(STOP_SIGNAL)
3、消费者每次 await queue.get() 后判断是否为哨兵,若是则 break 退出循环
4、当需传递异常时,生产者可放入 await queue.put(RuntimeError("failed")),消费者捕获后重新抛出
四、带超时控制的数据获取
为避免协程无限等待队列数据,可结合 asyncio.wait_for 使用,对 get 操作施加时间约束,提升系统响应确定性。
1、准备队列并启动生产者协程填充数据
2、在消费者中使用:try: item = await asyncio.wait_for(queue.get(), timeout=2.0) except asyncio.TimeoutError: print("等待数据超时")
3、成功获取后必须调用 queue.task_done(),否则 join 将无法完成
4、timeout 值应根据业务延迟特征设定,不能设为 0 或负数
五、与 asyncio.create_task 配合实现动态工作流
利用 Queue 在运行时接收新任务描述,使工作流具备动态扩展能力,适用于事件驱动型异步服务。
1、启动主调度协程,持续 await queue.get() 获取任务配置字典
2、解析配置后调用 asyncio.create_task(worker_func(**config)) 启动子协程
3、子协程执行完毕后,将结果写入另一个输出队列:await result_queue.put(result)
4、主协程不等待子任务完成,仅负责分发;可通过额外协程监听 result_queue 收集汇总










