
Python的`async/await`机制旨在通过协程实现并发,但其工作原理在处理CPU密集型任务时常引人困惑。本文将深入探讨为何`await`一个纯计算任务无法交出控制权,而`await asyncio.sleep(0)`却能实现任务切换。我们将剖析事件循环的协作机制,并提供针对CPU密集型任务的正确并发策略,帮助开发者避免常见陷阱。
1. asyncio 与协程概述
asyncio是Python中用于编写并发代码的库,它通过事件循环(event loop)和协程(coroutines)实现单线程内的协作式多任务。async/await语法是定义和等待协程的核心。其设计初衷是为了高效处理I/O密集型任务,例如网络请求、文件读写等。当一个协程遇到I/O操作并使用await关键字时,它会将控制权交还给事件循环,允许事件循环调度其他“准备就绪”的协程运行,从而避免了线程切换的开销,提高了资源利用率。
2. CPU密集型任务与 await 的误区
许多开发者在初次接触asyncio时,会误以为只要在函数前加上async,并在调用时使用await,就能实现任务的并发交替执行。然而,这并非总是如此,尤其是在处理CPU密集型任务时。
考虑以下示例代码:
立即学习“Python免费学习笔记(深入)”;
import asyncio
import time
async def long_function():
"""一个纯粹的CPU密集型任务,不涉及任何I/O或异步操作。"""
print(f"Task {asyncio.current_task().get_name()}: long_function started...")
for _ in range(50_000_000): # 大量循环,模拟耗时计算
pass
print(f"Task {asyncio.current_task().get_name()}: long_function finished.")
async def count_blocking():
"""包含阻塞性CPU任务的协程。"""
for x in range(3):
print(f"Count {x} in {asyncio.current_task().get_name()}")
await long_function() # 等待一个CPU密集型任务
async def main_blocking():
"""主协程,启动两个阻塞性计数任务。"""
task1 = asyncio.create_task(count_blocking(), name="Count-A")
task2 = asyncio.create_task(count_blocking(), name="Count-B")
await asyncio.gather(task1, task2)
if __name__ == "__main__":
start_time = time.perf_counter()
asyncio.run(main_blocking())
end_time = time.perf_counter()
print(f"\n总执行时间 (阻塞): {end_time - start_time:.2f} 秒")运行上述代码,你会发现输出结果是:一个count_blocking协程会完全执行完毕,包括其内部的long_function的所有迭代,然后另一个count_blocking协程才开始执行。输出顺序会是:
Count 0 in Count-A Task Count-A: long_function started... Task Count-A: long_function finished. Count 1 in Count-A Task Count-A: long_function started... Task Count-A: long_function finished. Count 2 in Count-A Task Count-A: long_function started... Task Count-A: long_function finished. Count 0 in Count-B Task Count-B: long_function started... Task Count-B: long_function finished. ...
这与我们期望的交替输出(如0、0、1、1...)大相径庭。原因是await关键字本身并不具备“中断”正在执行的函数的能力。它仅仅表示“我正在等待某个异步操作完成,在此期间,你可以去执行其他准备就绪的协程”。而long_function内部是一个纯粹的计算循环,它没有任何I/O操作,也没有主动向事件循环报告它正在“等待”什么。因此,一旦事件循环将控制权交给long_function,它就会一直运行直到计算完成,期间不会释放控制权,从而阻塞了整个事件循环。
3. asyncio.sleep(0) 的作用:显式交出控制权
为了实现CPU密集型任务的协作式并发,我们需要在耗时计算中显式地将控制权交还给事件循环。asyncio.sleep(0)就是实现这一目的的常用技巧。
import asyncio
import time
async def long_function_cooperative():
"""一个协作式的CPU密集型任务,周期性地交出控制权。"""
task_name = asyncio.current_task().get_name()
# print(f"Task {task_name}: long_function_cooperative started...")
for i in range(50_000_000):
# 每隔一定次数的循环,显式地交出控制权
if i % 10_000_000 == 0 and i != 0:
# print(f"Task {task_name}: Yielding at iteration {i}")
await asyncio.sleep(0) # 关键:交出控制权
print(f"Task {task_name}: long_function_cooperative finished.")
async def count_cooperative():
"""包含协作性CPU任务的协程。"""
for x in range(3):
print(f"Count {x} in {asyncio.current_task().get_name()}")
await long_function_cooperative() # 等待一个协作性CPU任务
async def main_cooperative():
"""主协程,启动两个协作性计数任务。"""
task1 = asyncio.create_task(count_cooperative(), name="Count-X")
task2 = asyncio.create_task(count_cooperative(), name="Count-Y")
await asyncio.gather(task1, task2)
if __name__ == "__main__":
start_time = time.perf_counter()
asyncio.run(main_cooperative())
end_time = time.perf_counter()
print(f"\n总执行时间 (协作): {end_time - start_time:.2f} 秒")现在,运行这段代码,你会看到期望的交替输出:
Count 0 in Count-X Count 0 in Count-Y Count 1 in Count-X Count 1 in Count-Y Count 2 in Count-X Count 2 in Count-Y Task Count-X: long_function_cooperative finished. Task Count-Y: long_function_cooperative finished. ...
await asyncio.sleep(0)的原理是:它是一个非阻塞的异步操作,告诉事件循环“我暂时不需要CPU,你可以去检查是否有其他协程准备好了”。即使是sleep(0),它也触发了事件循环的调度机制,允许其他等待中的协程获得执行机会。这正是asyncio协作式多任务的核心体现。
4. 事件循环的工作机制
asyncio的事件循环是单线程的,它维护一个任务队列。当一个协程通过await等待一个异步操作(如网络I/O、定时器或asyncio.sleep(0))时,它会暂停执行,并将控制权交还给事件循环。事件循环会检查任务队列,选择下一个“准备就绪”的协程来运行。
- I/O密集型任务: 当协程等待网络响应时,操作系统会处理网络通信,而Python线程可以去执行其他协程。当网络数据到达时,事件循环会收到通知,然后将等待该数据的协程标记为“准备就绪”,并在合适的时机重新调度它。
- CPU密集型任务: 如果一个协程正在执行纯粹的CPU计算,它不会自动释放控制权。它会一直占用CPU,直到计算完成。除非它内部显式地调用await一个异步操作(如asyncio.sleep(0)),否则事件循环无法介入并切换到其他协程。
因此,asyncio的并发性是“协作式”的,而不是“抢占式”的。协程必须主动选择何时交出控制权。
5. CPU密集型任务的真正解决方案
虽然asyncio.sleep(0)可以在一定程度上缓解CPU密集型任务的阻塞问题,但它并不能真正实现并行计算,因为asyncio事件循环仍然运行在单个线程中。对于需要充分利用多核CPU的重度CPU密集型任务,真正的解决方案是使用多进程(multiprocessing)。
concurrent.futures模块提供了ProcessPoolExecutor,可以方便地将CPU密集型任务提交到独立的进程中执行,从而绕过Python的全局解释器锁(GIL)限制,实现真正的并行。
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
def blocking_cpu_task(task_id, iterations):
"""一个阻塞的CPU密集型函数,适合在进程池中运行。"""
print(f"Process {task_id}: Starting CPU-bound task with {iterations} iterations...")
result = 0
for i in range(iterations):
result += i # 执行一些计算
print(f"Process {task_id}: Finished CPU-bound task. Result: {result % 1000}")
return f"Task {task_id} completed."
async def run_cpu_tasks_with_pool():
"""使用ProcessPoolExecutor异步运行CPU密集型任务。"""
# 使用ProcessPoolExecutor创建进程池,max_workers=None表示使用CPU核心数
with ProcessPoolExecutor(max_workers=2) as executor:
loop = asyncio.get_running_loop()
# 将CPU密集型任务提交到进程池,并等待其完成
tasks = [
loop.run_in_executor(executor, blocking_cpu_task, "Alpha", 50_000_000),
loop.run_in_executor(executor, blocking_cpu_task, "Beta", 50_000_000)
]
results = await asyncio.gather(*tasks)
print("\n所有CPU密集型任务通过ProcessPoolExecutor完成:")
for res in results:
print(res)
if __name__ == "__main__":
start_time = time.perf_counter()
asyncio.run(run_cpu_tasks_with_pool())
end_time = time.perf_counter()
print(f"\n总执行时间 (ProcessPoolExecutor): {end_time - start_time:.2f} 秒")运行此代码,你会看到两个blocking_cpu_task几乎同时开始执行,并且总执行时间会接近单个任务的执行时间,因为它们在不同的CPU核心上并行运行。
注意事项:
- ThreadPoolExecutor vs ProcessPoolExecutor: concurrent.futures.ThreadPoolExecutor用于线程池。虽然它可以用于将阻塞I/O操作移出主事件循环,但由于GIL的存在,对于纯Python的CPU密集型任务,线程池无法实现真正的并行计算。因此,对于CPU密集型任务,应优先考虑ProcessPoolExecutor。
- 任务粒度: 如果CPU密集型任务可以被细分为许多小块,并且每小块的执行时间较短,那么在每小块结束后插入await asyncio.sleep(0)可能是一种权宜之计,但它增加了上下文切换的开销。对于长时间运行的、不可中断的CPU计算,使用进程池是更健壮的选择。
6. 总结
asyncio和async/await是Python实现高效并发的强大工具,但它们主要适用于I/O密集型任务。理解其协作式多任务的本质至关重要:
- await关键字只有在等待一个异步操作(如I/O、定时器或显式地交出控制权)时,才会让出控制权。
- 纯粹的CPU密集型计算会阻塞整个asyncio事件循环,直到其完成。
- await asyncio.sleep(0)可以作为一种显式交出控制权的机制,使事件循环有机会调度其他协程。
- 对于需要真正并行执行的CPU密集型任务,应使用concurrent.futures.ProcessPoolExecutor将任务提交到独立的进程中运行。
正确区分任务类型并选择合适的并发策略,是编写高效、响应迅速的Python异步应用程序的关键。










