0

0

Python asyncio并发任务的超时控制与优雅关闭

DDD

DDD

发布时间:2025-07-22 14:52:21

|

924人浏览过

|

来源于php中文网

原创

python asyncio并发任务的超时控制与优雅关闭

本文探讨了在Python asyncio中如何有效管理可能长时间阻塞的并发任务,并实现整体操作的超时控制。针对asyncio.gather在特定场景下的局限性,重点介绍了asyncio.wait方法,它允许设定超时时间,并能区分已完成和未完成的任务,从而实现对未完成任务的优雅取消,确保程序按预期及时终止。

并发任务管理中的挑战

在asyncio异步编程中,我们经常需要同时运行多个并发任务。asyncio.gather是一个常用的工具,它能够并发地运行多个协程,并等待它们全部完成。然而,当某些任务可能因为等待外部事件(如网络数据、消息队列消息)而长时间阻塞,甚至无限期地不返回时,asyncio.gather的默认行为就显得力不从心了。它会一直等待所有任务完成,导致整个程序无法在预设的时间内退出,即使我们通过某种机制(如设置全局标志)尝试通知任务停止,如果任务内部的await操作本身是阻塞的,任务也无法及时响应停止信号。

asyncio.wait:实现精确超时控制

为了解决上述问题,asyncio提供了更为灵活的asyncio.wait方法。与asyncio.gather不同,asyncio.wait允许我们为一组并发任务设置一个整体的超时时间。它不会等待所有任务完成,而是在达到指定超时时间后立即返回,并告知哪些任务已完成,哪些仍在等待中。

asyncio.wait函数的基本签名如下: asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)

  • aws: 一个可迭代对象,包含要等待的Future或协程对象。
  • timeout: 可选参数,指定等待的最长时间(秒)。如果在此时间内所有任务未能完成,wait将提前返回。
  • return_when: 可选参数,指定何时返回。默认为ALL_COMPLETED(所有任务完成或超时),其他选项包括FIRST_COMPLETED(第一个任务完成时返回)和FIRST_EXCEPTION(第一个任务抛出异常时返回)。

asyncio.wait返回两个集合:done和pending。

  • done: 包含在等待期间已完成(正常完成或抛出异常)的任务。
  • pending: 包含在等待期间尚未完成的任务。

以下是一个示例,展示如何使用asyncio.wait来管理带有超时限制的并发任务:

立即学习Python免费学习笔记(深入)”;

import asyncio
import time

# 模拟长时间运行或阻塞的网络I/O任务
async def watch_task_data():
    print(f"[{time.time():.2f}] watch_task_data: 启动,模拟等待数据...")
    try:
        # 模拟长时间等待网络数据,可能永不返回
        await asyncio.sleep(100) # 模拟阻塞100秒
        print(f"[{time.time():.2f}] watch_task_data: 收到数据并完成。")
    except asyncio.CancelledError:
        print(f"[{time.time():.2f}] watch_task_data: 任务被取消。")
    except Exception as e:
        print(f"[{time.time():.2f}] watch_task_data: 发生异常: {e}")
    finally:
        print(f"[{time.time():.2f}] watch_task_data: 结束。")

async def watch_task_news():
    print(f"[{time.time():.2f}] watch_task_news: 启动,模拟等待新闻...")
    try:
        # 模拟另一个长时间等待新闻的任务
        await asyncio.sleep(100) # 模拟阻塞100秒
        print(f"[{time.time():.2f}] watch_task_news: 收到新闻并完成。")
    except asyncio.CancelledError:
        print(f"[{time.time():.2f}] watch_task_news: 任务被取消。")
    except Exception as e:
        print(f"[{time.time():.2f}] watch_task_news: 发生异常: {e}")
    finally:
        print(f"[{time.time():.2f}] watch_task_news: 结束。")

async def main():
    tasks = [
        watch_task_data(),
        watch_task_news(),
    ]

    print(f"[{time.time():.2f}] 主程序:开始等待任务,最长等待5秒...")
    # 设置整体超时为5秒
    done, pending = await asyncio.wait(tasks, timeout=5)

    print(f"[{time.time():.2f}] 主程序:等待结束。已完成任务数: {len(done)}, 未完成任务数: {len(pending)}")

    # 1. 处理已完成的任务 (done 集合)
    # 这些任务可能已正常完成,也可能在超时前抛出异常
    for task in done:
        try:
            result = task.result() # 获取任务结果,如果任务抛出异常,这里会重新抛出
            print(f"[{time.time():.2f}] 已完成任务结果: {result if result is not None else '无'}")
        except asyncio.CancelledError:
            # 理论上,done集合中的任务不应该被取消,除非在wait返回前被外部取消
            print(f"[{time.time():.2f}] 已完成任务被取消 (异常情况)")
        except Exception as e:
            print(f"[{time.time():.2f}] 已完成任务发生异常: {e}")

    # 2. 处理未完成的任务 (pending 集合)
    # 这些任务在超时时仍未完成,通常需要显式取消它们以释放资源
    for task in pending:
        print(f"[{time.time():.2f}] 主程序:正在取消未完成任务: {task.get_name() if hasattr(task, 'get_name') else task}")
        task.cancel() # 发送取消信号

        try:
            # 推荐等待任务真正结束,以确保任务有机会执行清理逻辑
            await task
        except asyncio.CancelledError:
            print(f"[{time.time():.2f}] 未完成任务 {task.get_name() if hasattr(task, 'get_name') else task} 已确认取消。")
        except Exception as e:
            print(f"[{time.time():.2f}] 取消未完成任务 {task.get_name() if hasattr(task, 'get_name') else task} 时发生异常: {e}")

    print(f"[{time.time():.2f}] 主程序:所有任务处理完毕。")

if __name__ == "__main__":
    asyncio.run(main())

运行上述代码,你会发现尽管watch_task_data和watch_task_news内部模拟了100秒的阻塞,但整个main函数会在大约5秒后退出,并且会打印出任务被取消的信息。

BGremover
BGremover

VanceAI推出的图片背景移除工具

下载

优雅地处理已完成与未完成任务

在使用asyncio.wait后,正确处理done和pending集合至关重要:

  1. 处理 done 集合: 对于done集合中的每个任务,你可以调用task.result()来获取任务的返回值。如果任务在执行过程中抛出了异常,task.result()会重新抛出该异常,因此需要使用try...except块来捕获并处理。

  2. 处理 pending 集合: pending集合中的任务是在超时时仍未完成的任务。为了避免资源泄露或程序僵死,通常需要显式地取消这些任务。

    • task.cancel(): 调用task.cancel()会向目标任务发送一个asyncio.CancelledError异常。这个异常会在任务内部下一个await点被抛出。
    • 任务内部的响应: 被取消的任务必须能够捕获asyncio.CancelledError,并在其except块中执行必要的资源清理工作(例如关闭文件、网络连接等)。如果任务不处理CancelledError,它将向上冒泡,可能导致程序崩溃或行为异常。
    • await task: 在调用task.cancel()之后,最佳实践是再次await task。这样做可以确保任务有机会完成其清理逻辑并真正终止。如果任务内部正确处理了CancelledError,那么await task将再次抛出CancelledError,我们可以捕获它。

替代方案:asyncio.wait_for

除了asyncio.wait,asyncio.wait_for也是一个有用的工具,它用于为单个协程或Future设置超时。如果指定的协程在超时时间内没有完成,asyncio.wait_for会取消该协程并抛出asyncio.TimeoutError。

示例:

async def limited_task():
    try:
        await asyncio.wait_for(some_long_running_coroutine(), timeout=10)
        print("limited_task: 任务在10秒内完成。")
    except asyncio.TimeoutError:
        print("limited_task: 任务超时。")

asyncio.wait_for适用于只需要对特定单个任务进行超时控制的场景,而asyncio.wait则更适合对一组任务进行整体超时管理。

注意事项与最佳实践

  1. 任务内部的取消处理: 任何可能被取消的异步任务都应该在其内部的try...except asyncio.CancelledError块中实现资源清理逻辑。这是异步编程中确保健壮性的关键一环。
  2. 资源清理: 无论是正常完成还是被取消,确保任务所持有的所有外部资源(如数据库连接、文件句柄、网络套接字等)都能被妥善关闭和释放。finally块是执行清理操作的理想位置。
  3. 异常处理: 当从done集合中的任务获取结果时,务必捕获task.result()可能抛出的任何异常。
  4. asyncio.gather与asyncio.wait的选择:
    • 如果需要等待所有任务完成,并且不需要整体超时控制,或者希望所有任务的异常都被聚合抛出,可以使用asyncio.gather(配合return_exceptions=True可以避免单个任务异常导致整个gather失败)。
    • 如果需要对一组任务设置整体超时,并希望在超时后能够处理已完成和未完成的任务,那么asyncio.wait是更合适的选择。

总结

在asyncio应用中,有效管理并发任务的生命周期,特别是处理可能无限期阻塞的任务并实施超时控制,是构建稳定、响应迅速系统的关键。asyncio.wait提供了强大的能力,允许开发者精确控制任务组的等待行为,区分已完成和未完成的任务,并通过显式取消机制优雅地终止长时间运行的任务。结合任务内部对CancelledError的妥善处理,我们可以确保即使在复杂的异步场景下,程序也能按预期及时响应并释放资源。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
数据库三范式
数据库三范式

数据库三范式是一种设计规范,用于规范化关系型数据库中的数据结构,它通过消除冗余数据、提高数据库性能和数据一致性,提供了一种有效的数据库设计方法。本专题提供数据库三范式相关的文章、下载和课程。

360

2023.06.29

如何删除数据库
如何删除数据库

删除数据库是指在MySQL中完全移除一个数据库及其所包含的所有数据和结构,作用包括:1、释放存储空间;2、确保数据的安全性;3、提高数据库的整体性能,加速查询和操作的执行速度。尽管删除数据库具有一些好处,但在执行任何删除操作之前,务必谨慎操作,并备份重要的数据。删除数据库将永久性地删除所有相关数据和结构,无法回滚。

2082

2023.08.14

vb怎么连接数据库
vb怎么连接数据库

在VB中,连接数据库通常使用ADO(ActiveX 数据对象)或 DAO(Data Access Objects)这两个技术来实现:1、引入ADO库;2、创建ADO连接对象;3、配置连接字符串;4、打开连接;5、执行SQL语句;6、处理查询结果;7、关闭连接即可。

349

2023.08.31

MySQL恢复数据库
MySQL恢复数据库

MySQL恢复数据库的方法有使用物理备份恢复、使用逻辑备份恢复、使用二进制日志恢复和使用数据库复制进行恢复等。本专题为大家提供MySQL数据库相关的文章、下载、课程内容,供大家免费下载体验。

256

2023.09.05

vb中怎么连接access数据库
vb中怎么连接access数据库

vb中连接access数据库的步骤包括引用必要的命名空间、创建连接字符串、创建连接对象、打开连接、执行SQL语句和关闭连接。本专题为大家提供连接access数据库相关的文章、下载、课程内容,供大家免费下载体验。

326

2023.10.09

数据库对象名无效怎么解决
数据库对象名无效怎么解决

数据库对象名无效解决办法:1、检查使用的对象名是否正确,确保没有拼写错误;2、检查数据库中是否已存在具有相同名称的对象,如果是,请更改对象名为一个不同的名称,然后重新创建;3、确保在连接数据库时使用了正确的用户名、密码和数据库名称;4、尝试重启数据库服务,然后再次尝试创建或使用对象;5、尝试更新驱动程序,然后再次尝试创建或使用对象。

412

2023.10.16

vb连接access数据库的方法
vb连接access数据库的方法

vb连接access数据库方法:1、使用ADO连接,首先导入System.Data.OleDb模块,然后定义一个连接字符串,接着创建一个OleDbConnection对象并使用Open() 方法打开连接;2、使用DAO连接,首先导入 Microsoft.Jet.OLEDB模块,然后定义一个连接字符串,接着创建一个JetConnection对象并使用Open()方法打开连接即可。

411

2023.10.16

vb连接数据库的方法
vb连接数据库的方法

vb连接数据库的方法有使用ADO对象库、使用OLEDB数据提供程序、使用ODBC数据源等。详细介绍:1、使用ADO对象库方法,ADO是一种用于访问数据库的COM组件,可以通过ADO连接数据库并执行SQL语句。可以使用ADODB.Connection对象来建立与数据库的连接,然后使用ADODB.Recordset对象来执行查询和操作数据;2、使用OLEDB数据提供程序方法等等。

223

2023.10.19

C++ 设计模式与软件架构
C++ 设计模式与软件架构

本专题深入讲解 C++ 中的常见设计模式与架构优化,包括单例模式、工厂模式、观察者模式、策略模式、命令模式等,结合实际案例展示如何在 C++ 项目中应用这些模式提升代码可维护性与扩展性。通过案例分析,帮助开发者掌握 如何运用设计模式构建高质量的软件架构,提升系统的灵活性与可扩展性。

14

2026.01.30

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.4万人学习

Django 教程
Django 教程

共28课时 | 3.7万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号