0

0

Python asyncio:从任务生成器实现高效异步并发执行的原理与实践

DDD

DDD

发布时间:2025-09-02 19:14:18

|

441人浏览过

|

来源于php中文网

原创

Python asyncio:从任务生成器实现高效异步并发执行的原理与实践

本教程深入探讨如何在Python asyncio中,从任务生成器实现异步任务的无阻塞并发执行。针对在不 await 任务完成的情况下,持续创建并调度新任务的需求,文章详细阐述了 asyncio 协程协作的本质,并提供了两种核心解决方案:通过 await asyncio.sleep(0) 显式让出控制权,以及利用 Python 3.11+ 的 asyncio.TaskGroup 实现更结构化的并发管理,确保任务能够真正地并行运行。

引言

在开发高性能、高并发的应用程序时,python的 asyncio 库提供了一种强大的异步编程范式。特别是在处理需要持续生成和调度任务的场景,例如长轮询服务器、事件驱动系统或数据流处理,如何有效地将这些任务添加到事件循环并确保它们能够并发执行,是一个常见的挑战。本文将深入探讨如何从一个任务生成器中,以异步、非阻塞的方式创建并执行任务,避免因等待单个任务完成而阻塞整个事件循环。

理解异步任务生成的挑战

考虑以下场景:我们有一个任务生成器,它会不断地产生新的任务参数。我们希望为每个参数创建一个异步任务,并将其提交给事件循环,但又不希望主逻辑(即生成任务的部分)停下来等待这些任务完成。

最初的尝试可能如下:

import asyncio, random 

async def wrapper(word: str):
    print(f"Executing task for: {word}")
    await asyncio.sleep(1) # 模拟耗时操作
    print(f"Finished task for: {word}")

def generator():
    abc = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
    while True:
        yield random.choice(abc)

async def manager():
    loop = asyncio.get_event_loop()
    for letter in generator():
        loop.create_task(wrapper(letter)) # 创建任务,但不等待
        # 问题在于:这里没有让出控制权,事件循环无法调度其他任务

async def main():
    await manager() # manager是一个无限循环,此处会阻塞

if __name__ == '__main__':
    # asyncio.run(manager()) # 这样调用会因为manager的无限循环而阻塞
    # 需要一种方式让manager能够持续创建任务,同时让其他任务运行
    pass # 暂时不运行,因为会阻塞

上述代码的问题在于,manager 协程内部的 for 循环会无限快速地运行,不断地调用 loop.create_task()。虽然 create_task 将 wrapper 协程包装成一个任务并提交给事件循环,但 manager 协身本身并没有任何 await 语句,这意味着它从不主动让出控制权给事件循环。结果是,事件循环没有机会去执行那些被创建的 wrapper 任务,因为 manager 始终占用着CPU。

核心概念回顾

要解决这个问题,我们需要理解 asyncio 的核心工作原理:

立即学习Python免费学习笔记(深入)”;

  • 事件循环 (Event Loop):asyncio 的核心,负责调度和执行协程。它是一个单线程的循环,通过轮询注册的协程,在协程 await 时暂停当前协程,并选择下一个准备好运行的协程执行。
  • 协程 (Coroutines) 与 任务 (Tasks):协程是可暂停和恢复的函数。asyncio.create_task() 将一个协程包装成一个 Task 对象,使其可以被事件循环调度。
  • 让出控制权 (Yielding):这是 asyncio 并发实现的关键。一个协程只有在遇到 await 表达式时,才会暂停自身并将控制权交还给事件循环。事件循环才能检查是否有其他任务准备就绪并执行它们。

解决方案一:显式让出控制权

最直接的解决方案是在 manager 协程的循环内部,显式地让出控制权。await asyncio.sleep(0) 是一个常用的技巧,它会立即暂停当前协程,并将控制权交还给事件循环。由于 sleep 的时间是0,事件循环会立即检查是否有其他任务准备就绪,并在下一个循环迭代中重新调度 manager 协程。

import asyncio, random 

async def wrapper(word: str):
    """模拟一个耗时操作的异步任务"""
    print(f"Executing task for: {word}")
    await asyncio.sleep(1) # 任务模拟
    print(f"Finished task for: {word}")

def generator():
    """一个无限生成随机字母的生成器"""
    abc = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
    while True:
        yield random.choice(abc)

async def manager_with_yield():
    """
    负责从生成器获取任务并调度,通过await asyncio.sleep(0)显式让出控制权。
    """
    loop = asyncio.get_event_loop()
    print("Manager started, generating tasks...")
    for i, letter in enumerate(generator()):
        loop.create_task(wrapper(letter))
        print(f"Task {i+1} created for '{letter}'")
        await asyncio.sleep(0) # 关键:让出控制权,允许其他任务运行
        # 实际应用中,可以根据需要增加一个短暂的等待,例如 await asyncio.sleep(0.01)
        # 或者在处理一定数量任务后才让出控制权,以平衡调度开销和响应性。

        if i >= 10: # 示例:限制生成任务的数量,否则会无限运行
            print("Generated 10 tasks, stopping manager.")
            break

async def main_with_yield():
    """主入口点,运行带有显式让出控制权的manager"""
    await manager_with_yield()
    # 等待所有已创建的wrapper任务完成
    print("Manager finished, waiting for remaining tasks...")
    await asyncio.sleep(2) # 给剩余任务一些时间完成

if __name__ == '__main__':
    print("--- Running Solution 1: Explicit Yielding ---")
    asyncio.run(main_with_yield())
    print("--- Solution 1 Finished ---")

注意事项:

  • await asyncio.sleep(0) 是一种有效的让出控制权的方式,它确保事件循环有机会处理其他已调度的任务。
  • 在实际应用中,manager 协程通常不会是无限循环,或者会有一个退出条件。如果它是无限循环,并且没有其他机制(如 asyncio.run 的 timeout 参数或外部信号)来停止它,程序将持续运行。
  • 这种方法虽然有效,但在语义上 sleep(0) 可能感觉像是一个“技巧”。

解决方案二:使用 asyncio.TaskGroup (Python 3.11+)

Python 3.11 引入了 asyncio.TaskGroup,这是一种更现代、更结构化的并发管理方式。TaskGroup 提供了一个上下文管理器,可以在其中创建任务。它会自动管理这些任务的生命周期,并在退出上下文时等待所有在其内部创建的任务完成(或处理异常)。更重要的是,TaskGroup 在内部会自动处理任务的调度和让出控制权,使得代码更加简洁和健壮。

英特尔AI工具
英特尔AI工具

英特尔AI与机器学习解决方案

下载
import asyncio, random 

async def wrapper(word: str):
    """模拟一个耗时操作的异步任务"""
    print(f"Executing task for: {word}")
    await asyncio.sleep(1) # 任务模拟
    print(f"Finished task for: {word}")

def generator():
    """一个无限生成随机字母的生成器"""
    abc = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
    while True:
        yield random.choice(abc)

async def manager_with_taskgroup():
    """
    负责从生成器获取任务并调度,使用asyncio.TaskGroup进行结构化并发管理。
    """
    print("Manager started with TaskGroup, generating tasks...")
    async with asyncio.TaskGroup() as tg: # 使用TaskGroup上下文管理器
        for i, letter in enumerate(generator()):
            tg.create_task(wrapper(letter)) # 在TaskGroup中创建任务
            print(f"Task {i+1} created for '{letter}'")
            # TaskGroup在内部会处理调度和让出控制权,通常无需额外的await asyncio.sleep(0)
            # 但如果生成任务的速度极快,且任务本身耗时很短,
            # 偶尔添加 await asyncio.sleep(0) 仍可能优化响应性。

            if i >= 10: # 示例:限制生成任务的数量
                print("Generated 10 tasks, stopping manager.")
                break
    print("TaskGroup exited. All tasks created within it should have completed or been cancelled.")

async def main_with_taskgroup():
    """主入口点,运行带有TaskGroup的manager"""
    await manager_with_taskgroup()

if __name__ == '__main__':
    print("\n--- Running Solution 2: Using asyncio.TaskGroup (Python 3.11+) ---")
    # 确保Python版本 >= 3.11
    if hasattr(asyncio, 'TaskGroup'):
        asyncio.run(main_with_taskgroup())
    else:
        print("Warning: asyncio.TaskGroup requires Python 3.11 or later. Skipping this example.")
    print("--- Solution 2 Finished ---")

TaskGroup 的优势:

  • 结构化并发 (Structured Concurrency):所有在 TaskGroup 中创建的任务都在其作用域内管理。当 TaskGroup 退出时,它会等待所有子任务完成,或者在发生异常时优雅地取消它们。这极大地简化了错误处理和资源管理。
  • 隐式让出控制权:TaskGroup 的实现通常会确保事件循环得到足够的调度机会,减少了手动 await asyncio.sleep(0) 的必要性。
  • 代码清晰:通过上下文管理器,任务的生命周期和相互关系变得一目了然。

版本要求: asyncio.TaskGroup 需要 Python 3.11 或更高版本。对于旧版本Python,解决方案一仍然是可行的。

完整示例与最佳实践

结合上述两种方法,以下是一个更完整的示例,展示了如何从生成器高效地调度异步任务,并包含一些最佳实践的思考。我们优先推荐使用 TaskGroup。

import asyncio
import random
import time

async def process_item(item_id: int, data: str):
    """模拟一个异步处理任务,打印处理信息并模拟耗时"""
    start_time = time.time()
    print(f"[{item_id}] Processing item: '{data}'...")
    await asyncio.sleep(random.uniform(0.5, 2.0)) # 模拟随机耗时
    end_time = time.time()
    print(f"[{item_id}] Finished item: '{data}' in {end_time - start_time:.2f}s")

def item_generator(max_items: int = 20):
    """一个生成器,生成带ID的随机数据"""
    abc = 'abcdefghijklmnopqrstuvwxyz'
    for i in range(1, max_items + 1):
        yield i, random.choice(abc) * random.randint(3, 8) # 生成随机长度的字符串

async def task_dispatcher():
    """
    任务调度器,从生成器获取数据并创建异步任务。
    优先使用TaskGroup,如果不可用则回退到显式让出控制权。
    """
    print("--- Task Dispatcher Started ---")
    item_count = 0

    if hasattr(asyncio, 'TaskGroup'):
        print("Using asyncio.TaskGroup for task management.")
        async with asyncio.TaskGroup() as tg:
            for item_id, data in item_generator():
                tg.create_task(process_item(item_id, data))
                print(f"Dispatched task {item_id} for data '{data}'")
                item_count += 1
                # 即使使用TaskGroup,如果生成任务的速度远超任务执行速度,
                # 也可以考虑在此处加入一个短暂的await,以避免内存中积压过多未开始的任务。
                # 例如: if item_count % 5 == 0: await asyncio.sleep(0.01)
        print(f"--- TaskGroup Finished. All {item_count} tasks completed or cancelled. ---")
    else:
        print("asyncio.TaskGroup not available (Python < 3.11). Falling back to explicit yield.")
        loop = asyncio.get_event_loop()
        for item_id, data in item_generator():
            loop.create_task(process_item(item_id, data))
            print(f"Dispatched task {item_id} for data '{data}'")
            item_count += 1
            await asyncio.sleep(0) # 显式让出控制权
        print(f"--- Dispatcher Finished creating {item_count} tasks. Waiting for them to complete. ---")
        # 由于是手动创建任务且没有TaskGroup等待,需要额外等待所有任务完成
        await asyncio.sleep(3) # 粗略等待,实际应用中可能需要更精细的等待机制

async def main():
    """主程序入口"""
    await task_dispatcher()
    print("All dispatching and processing should be complete.")

if __name__ == '__main__':
    asyncio.run(main())

最佳实践:

  1. 选择合适的并发工具
    • Python 3.11+:优先使用 asyncio.TaskGroup,它提供了结构化并发的优势,简化了任务管理、错误处理和资源清理。
    • Python :使用 loop.create_task() 结合 await asyncio.sleep(0) 是实现非阻塞任务调度的有效方法。
  2. 流量控制与背压:如果任务生成器产生任务的速度远超事件循环处理任务的速度,可能会导致内存占用过高或系统负载过大。在这种情况下,需要引入流量控制机制,例如:
    • 信号量 (Semaphore):限制同时运行的并发任务数量。
    • 队列 (Queue):将任务参数放入 asyncio.Queue,由固定数量的消费者协程从队列中取出并执行任务。
    • 批处理:一次性生成并调度一批任务,然后等待这批任务完成或达到某个阈值后再生成下一批。
  3. 错误处理:在 TaskGroup 中,如果任何子任务抛出异常,TaskGroup 会捕获它并在退出上下文时重新抛出 ExceptionGroup(Python 3.11+)。这使得集中处理错误变得容易。对于手动 create_task 的情况,需要单独管理任务的异常,例如通过 task.add_done_callback() 或收集任务引用并在稍后 await 它们以捕获异常。
  4. 任务生命周期管理:如果需要取消正在运行的任务,或者获取任务的结果,需要保留 Task 对象的引用。TaskGroup 在退出时会自动处理取消和等待,但在更复杂的场景中,可能仍需手动管理任务引用。

总结

在 asyncio 中从任务生成器实现高效异步并发执行的核心在于理解事件循环的协作式调度机制。仅仅通过 create_task() 创建任务不足以实现并发,关键在于主调度逻辑必须周期性地让出控制权给事件循环。

  • 对于 Python 3.11 及更高版本,推荐使用 asyncio.TaskGroup,它提供了一种结构化、健壮且易于管理任务生命周期和错误处理的并发模式。
  • 对于 Python 3.10 及更低版本,await asyncio.sleep(0) 是一个有效的技巧,能够强制协程让出控制权,从而允许事件循环调度其他已创建的任务。

无论采用哪种方法,理解 await 的作用以及事件循环的工作原理,是构建高效、响应式 asyncio 应用程序的基础。在实际应用中,还需要结合流量控制、错误处理等机制,确保系统的稳定性和可扩展性。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

525

2023.08.10

C++ 设计模式与软件架构
C++ 设计模式与软件架构

本专题深入讲解 C++ 中的常见设计模式与架构优化,包括单例模式、工厂模式、观察者模式、策略模式、命令模式等,结合实际案例展示如何在 C++ 项目中应用这些模式提升代码可维护性与扩展性。通过案例分析,帮助开发者掌握 如何运用设计模式构建高质量的软件架构,提升系统的灵活性与可扩展性。

14

2026.01.30

c++ 字符串格式化
c++ 字符串格式化

本专题整合了c++字符串格式化用法、输出技巧、实践等等内容,阅读专题下面的文章了解更多详细内容。

9

2026.01.30

java 字符串格式化
java 字符串格式化

本专题整合了java如何进行字符串格式化相关教程、使用解析、方法详解等等内容。阅读专题下面的文章了解更多详细教程。

12

2026.01.30

python 字符串格式化
python 字符串格式化

本专题整合了python字符串格式化教程、实践、方法、进阶等等相关内容,阅读专题下面的文章了解更多详细操作。

4

2026.01.30

java入门学习合集
java入门学习合集

本专题整合了java入门学习指南、初学者项目实战、入门到精通等等内容,阅读专题下面的文章了解更多详细学习方法。

20

2026.01.29

java配置环境变量教程合集
java配置环境变量教程合集

本专题整合了java配置环境变量设置、步骤、安装jdk、避免冲突等等相关内容,阅读专题下面的文章了解更多详细操作。

18

2026.01.29

java成品学习网站推荐大全
java成品学习网站推荐大全

本专题整合了java成品网站、在线成品网站源码、源码入口等等相关内容,阅读专题下面的文章了解更多详细推荐内容。

19

2026.01.29

Java字符串处理使用教程合集
Java字符串处理使用教程合集

本专题整合了Java字符串截取、处理、使用、实战等等教程内容,阅读专题下面的文章了解详细操作教程。

3

2026.01.29

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.4万人学习

Django 教程
Django 教程

共28课时 | 3.7万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号