
本文详解如何在 asyncio 中确保子任务(如 `a_task`)在父任务(如 `fetch_values`)完成后立即并发执行,且后续任务(如 `other_task`)严格等待同一批数据的前置任务全部完成——关键在于避免阻塞调用、合理组织 await 链与并发粒度。
在你的原始代码中,看似使用了 async/await,但核心问题在于混用了同步阻塞调用与异步编程模型,导致整个协程被“冻结”,丧失并发能力。具体有两大陷阱:
- time.sleep() 是同步阻塞函数:它会完全停住当前事件循环线程,使其他协程无法调度。必须替换为 await asyncio.sleep();
- fetch_values_and_process 内部任务调度逻辑错误:原代码中 a_tasks 的构造和 await asyncio.gather(*a_tasks) 被包裹在 if not a_values.empty: 条件内,但更严重的是——other_task 被放在 if 块外部却未加 else 对齐,逻辑结构易引发误解;更重要的是,你期望每个 item 的处理流程是独立流水线:fetch_values → (并发 a_task × N) → other_task,而原写法因列表推导式提前生成所有 a_task 协程对象(但未 await),再统一 gather,破坏了按 item 隔离的执行边界。
✅ 正确做法是:
- 每个 item 启动一个独立协程(通过 asyncio.gather 并发启动);
- 在该协程内部,严格按依赖顺序 await:先 await fetch_values(item) 获取数据 → 再提取 a_values 并并发执行其 a_task → 最后 await other_task(other_values);
- 所有耗时操作(sleep、I/O 模拟)必须使用 asyncio.sleep 或真正的异步 I/O 库(如 aiohttp, aiomysql)。
以下是修复后的完整可运行示例:
import asyncio
import pandas as pd
async def execute_check():
print("execute_check")
items = [1, 2, 3, 4]
# 并发启动 4 个独立 item 处理流程
tasks = [fetch_values_and_process(item) for item in items]
await asyncio.gather(*tasks)
async def fetch_values_and_process(item):
print(f"fetch_values_and_process for item {item}")
values_df = await fetch_values(item) # ✅ 等待本 item 数据就绪
a_values = values_df[values_df["Label"] == "A"]
other_values = values_df[values_df["Label"] != "A"]
# ✅ 对当前 item 的所有 'A' 行,并发执行 a_task
if not a_values.empty:
a_tasks = [a_task(row) for _, row in a_values.iterrows()]
await asyncio.gather(*a_tasks) # ⚠️ 必须 await,否则不执行
# ✅ other_task 严格依赖本 item 的 a_task 全部完成
await other_task(other_values)
async def fetch_values(item):
print(f"fetch_values for item {item}")
await asyncio.sleep(5) # ✅ 异步等待,不阻塞事件循环
# 修正 DataFrame 数据:按 item 区分,避免混淆
return pd.DataFrame({
"Item": [item] * 4,
"Label": ["A", "B", "C", "D"]
})
async def a_task(row):
print(f"a_task for Item={row['Item']}, Label={row['Label']}")
await asyncio.sleep(2)
async def other_task(other_values):
count = len(other_values)
print(f"other_task for {count} non-A rows")
await asyncio.sleep(2)
if __name__ == "__main__":
asyncio.run(execute_check())? 预期输出特征(体现正确并发与依赖):
立即学习“Python免费学习笔记(深入)”;
- fetch_values for item X 会几乎同时打印(4 个协程并发发起);
- 约 5 秒后,各 a_task 开始并发打印(因 fetch_values 已完成);
- a_task 执行约 2 秒后,对应 other_task 紧接着执行(无跨 item 串行等待);
- 不同 item 的 a_task 和 other_task 可能交错,但每个 item 内部顺序严格保证:fetch → a_tasks → other_task。
⚠️ 重要注意事项:
- 若 fetch_values 实际对接 HTTP/API,务必使用 aiohttp 等异步客户端,而非 requests(它是同步阻塞的);
- pandas 本身非异步库,其计算操作(如 .iterrows()、布尔索引)是 CPU 绑定同步行为,不影响事件循环,但大量计算建议用 asyncio.to_thread() 或 concurrent.futures.ThreadPoolExecutor 卸载;
- 避免在协程中直接调用任何含 time.sleep、input()、requests.get() 等同步阻塞函数——这是异步失效的最常见原因。
掌握“每个协程封装一个完整业务单元 + 依赖步骤显式 await + 所有延迟用 asyncio.sleep 替代”这三点,就能精准控制异步任务的并发粒度与执行时序。










