asyncio.queue不能直接当线程队列用,因其协程安全但非线程安全,跨线程使用会导致卡死或runtimeerror;爬虫中需设maxsize防内存暴涨,消费者须用try/except+task_done保障管道健壮。

asyncio.Queue 为什么不能直接当线程队列用
因为 asyncio.Queue 是协程安全的,但不是线程安全的——它只在同一个事件循环里有效。如果你在 ThreadPoolExecutor 里往 asyncio.Queue put 数据,或者跨线程 await get(),会直接卡死或抛 RuntimeError: This event loop is already running。
常见错误现象:程序启动后没报错,但消费者永远不消费;或者某次 get() 后整个协程挂住,CPU 占用归零。
- 生产者必须是 async 函数,用
await queue.put(item),不能用queue.put_nowait()除非你确定队列没满且已进入事件循环 - 消费者必须用
await queue.get(),拿到后务必调用queue.task_done(),否则join()永远等不到完成 - 别在同步函数里混用
asyncio.Queue,比如用requests.get()爬完就queue.put()—— 这会阻塞整个 event loop
爬虫生产者怎么避免撑爆内存
asyncio.Queue 默认无界,爬虫一开几十个并发,几秒内塞几千个 URL 或响应体,内存直接飙到几个 GB。这不是“快”,是失控。
使用场景:高频抓取列表页 → 解析详情页 URL → 放进队列 → 消费者发请求拿正文。
立即学习“Python免费学习笔记(深入)”;
- 初始化时显式设上限:
queue = asyncio.Queue(maxsize=100),比默认值更可控 - 生产者侧加
await queue.join()前置等待(可选),或用try/except asyncio.QueueFull做降速,但更推荐前者 - URL 和响应体尽量只传轻量结构:比如只放
{"url": "...", "meta": {...}},别把整段 HTML 字符串塞进去 - 如果解析逻辑重(如用 lxml 处理大 HTML),考虑把解析也异步化,或扔给
loop.run_in_executor,但注意别反向污染队列
消费者怎么保证异常不中断管道
一个消费者任务抛未捕获异常(比如 JSON 解析失败、字段缺失),整个 asyncio.gather() 或 asyncio.create_task() 就崩了,后续数据全丢。
参数差异:queue.get() 本身不抛异常,但你后续处理时出错,队列状态不会自动回滚。
- 每个消费者任务必须包一层
try/except Exception,至少记录logging.exception() - 出错后仍要调用
queue.task_done(),否则queue.join()死锁 - 别用
asyncio.wait_for(..., timeout=...)包整个消费逻辑,timeout 触发后 task 被 cancel,task_done()容易漏掉——改用内部超时 + 显式 done - 需要重试时,把失败项重新
await queue.put(item),但记得加计数防无限循环,比如item.setdefault("retry_count", 0)
和 aiohttp + aiomysql 搭配时的典型卡点
很多人以为 “都用 async 就天然兼容”,结果发现队列吞吐上不去,DB 写入慢得像同步,甚至连接池耗尽。
性能影响:aiohttp 的 session 应复用,aiomysql 的 pool 要提前初始化,否则每次新建连接,协程就在等 IO 上空转。
- aiohttp session 必须作为参数传入生产者/消费者函数,或用 global+lazy init,别在每次请求里
aiohttp.ClientSession() - aiomysql pool 初始化必须
await完成,且在整个应用生命周期只做一次;消费者里用async with pool.acquire() as conn:,别手动 close - 别让一个消费者既发 HTTP 请求又写 DB——拆成两级队列:第一级收响应,第二级收待入库数据,解耦 IO 类型
- 如果用
asyncio.gather(*[consumer(queue) for _ in range(5)]),确保 consumer 内部没共享状态(比如共用一个没加锁的 dict)
最常被忽略的是:队列满、连接池满、磁盘 I/O(比如写 CSV 文件)这三者看起来无关,但在高并发下会互相放大阻塞效应。压测时别只看 CPU,盯住 asyncio.Task 状态和 event loop stalled time。










