0

0

Python并发编程:asyncio与threading协同实现同步任务并行化

碧海醫心

碧海醫心

发布时间:2025-10-30 11:38:35

|

161人浏览过

|

来源于php中文网

原创

Python并发编程:asyncio与threading协同实现同步任务并行化

本文探讨python中如何将同步阻塞函数与异步协程任务并行执行。通过分析`asyncio`事件循环的特性,我们揭示了直接调用同步函数会阻塞事件循环的问题。核心解决方案是利用`asyncio.run_in_executor`将同步任务提交到独立的线程池中执行,从而实现与异步任务的并发运行,有效提升应用程序的响应性和吞吐量,尤其适用于处理i/o密集型或cpu密集型同步操作。

理解asyncio中的并发与阻塞

在Python的asyncio框架中,协程(coroutine)是实现并发的主要机制。一个asyncio事件循环在单线程中运行,通过协作式多任务处理(cooperative multitasking)来实现并发。这意味着当一个协程遇到await表达式时,它会暂停执行并将控制权交还给事件循环,允许其他协程运行。然而,如果一个协程内部调用了一个耗时且不包含await的同步阻塞函数,那么整个事件循环将会被阻塞,直到该同步函数执行完毕,其他所有协程都无法运行。

考虑以下原始代码示例:

import asyncio
import threading

def do_sync(number: int) -> int:
    # 这里的lock操作是为了演示,实际中如果函数内部不共享状态,则无需加锁
    lock = threading.Lock()
    lock.acquire()
    print(f"do_sync: 开始处理 {number}")
    result = number + 10
    print(f"do_sync: 完成处理 {number}")
    lock.release()
    return result

async def do_async(number: int) -> int:
    print(f"do_async: 开始处理 {number}")
    # 模拟异步I/O操作
    await asyncio.sleep(0.1) 
    result = number + 10
    print(f"do_async: 完成处理 {number}")
    return result

async def main():
    # 这里的max函数会先完全执行do_sync(1),然后才await do_async(2)
    # 它们是顺序执行的,不是并行
    result = max(do_sync(1), await do_async(2))
    print(f"最终结果: {result}")

if __name__ == "__main__":
    asyncio.run(main())

在这段代码中,main函数内的max(do_sync(1), await do_async(2))表达式会首先完全执行do_sync(1)。由于do_sync是一个普通的同步函数,它会立即运行并阻塞当前线程,直到其返回结果。只有当do_sync(1)执行完毕后,await do_async(2)才会被调度执行。因此,这两个函数是顺序执行的,并未实现真正的并发。

利用 asyncio.run_in_executor 实现同步任务并行化

为了让同步阻塞函数与异步协程任务并行运行,我们需要将同步函数从asyncio事件循环的主线程中“卸载”到另一个独立的线程或进程中执行。asyncio为此提供了loop.run_in_executor方法。

立即学习Python免费学习笔记(深入)”;

run_in_executor允许我们将一个普通的同步函数提交到一个执行器(executor)中运行。默认情况下,asyncio使用ThreadPoolExecutor作为执行器,这意味着同步函数将在一个单独的线程中运行。这样,asyncio事件循环可以继续处理其他协程任务,而无需等待同步函数完成。

run_in_executor的基本用法如下:

await loop.run_in_executor(executor, func, *args)
  • executor: 可以是一个concurrent.futures.ThreadPoolExecutor或concurrent.futures.ProcessPoolExecutor实例,或者None(默认使用ThreadPoolExecutor)。
  • func: 要在执行器中运行的同步函数。
  • *args: 传递给func的参数。

run_in_executor会返回一个Future对象,我们可以await这个Future来等待同步函数的执行结果。

示例代码与解析

现在,我们使用run_in_executor来改造main函数,使do_sync与do_async能够并行运行:

Bandy AI
Bandy AI

全球领先的电商设计Agent

下载
import asyncio
import threading
import time # 引入time模块用于模拟耗时操作

def do_sync(number: int) -> int:
    # 这里的lock操作是为了演示,实际中如果函数内部不共享状态,则无需加锁
    # 在run_in_executor场景下,如果do_sync内部需要线程安全,则仍需加锁
    # lock = threading.Lock() # 每次调用都会创建新锁,这里不合适,应在外部管理或无需
    print(f"do_sync: 开始处理 {number} 在线程 {threading.current_thread().name}")
    time.sleep(1) # 模拟一个耗时1秒的同步操作
    result = number + 10
    print(f"do_sync: 完成处理 {number} 在线程 {threading.current_thread().name}")
    return result

async def do_async(number: int) -> int:
    print(f"do_async: 开始处理 {number} 在线程 {threading.current_thread().name}")
    await asyncio.sleep(0.5) # 模拟一个耗时0.5秒的异步I/O操作
    result = number + 10
    print(f"do_async: 完成处理 {number} 在线程 {threading.current_thread().name}")
    return result

async def main():
    loop = asyncio.get_running_loop()

    # 使用run_in_executor将do_sync提交到线程池执行
    # 注意:do_sync_future是一个Future对象,它代表do_sync的未来结果
    do_sync_future = loop.run_in_executor(None, do_sync, 1)

    # do_async是一个awaitable对象
    do_async_task = do_async(2)

    # 使用asyncio.gather等待这两个任务并行完成
    # asyncio.gather会等待所有给定的awaitable对象完成
    sync_result, async_result = await asyncio.gather(do_sync_future, do_async_task)

    final_max_result = max(sync_result, async_result)
    print(f"同步任务结果: {sync_result}")
    print(f"异步任务结果: {async_result}")
    print(f"最终最大结果: {final_max_result}")

if __name__ == "__main__":
    start_time = time.monotonic()
    asyncio.run(main())
    end_time = time.monotonic()
    print(f"总耗时: {end_time - start_time:.2f} 秒")

代码解析:

  1. 获取事件循环: loop = asyncio.get_running_loop() 获取当前正在运行的事件循环实例。
  2. 提交同步任务: do_sync_future = loop.run_in_executor(None, do_sync, 1) 将do_sync(1)函数提交到默认的线程池中执行。None表示使用默认的ThreadPoolExecutor。run_in_executor立即返回一个Future对象,而不是阻塞。
  3. 创建异步任务: do_async_task = do_async(2) 创建一个协程对象,它是一个可等待(awaitable)对象。
  4. 并行等待: await asyncio.gather(do_sync_future, do_async_task) 是关键。asyncio.gather允许我们同时等待多个可等待对象。在这种情况下,do_sync_future会在一个单独的线程中执行,而do_async_task则在事件循环的主线程中作为协程执行。它们会并行地运行,等待时间由最长的那个任务决定。
  5. 结果处理: 当asyncio.gather返回时,它会按顺序返回各个任务的结果。

运行上述代码,你会观察到do_sync和do_async的打印输出几乎同时开始,并且总耗时约为1秒(取决于do_sync的time.sleep时间),而不是1.5秒(1秒 + 0.5秒),这证明了它们是并行执行的。

注意事项

  1. 选择合适的执行器:

    • ThreadPoolExecutor (默认): 适用于I/O密集型同步任务(如网络请求、文件读写)。它将阻塞操作放到单独的线程中,释放GIL,允许Python解释器在其他线程中运行。
    • ProcessPoolExecutor: 适用于CPU密集型同步任务。它将任务放到单独的进程中,可以绕过GIL的限制,真正实现CPU层面的并行计算。使用时需要注意进程间通信的开销以及数据序列化问题。
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    
    # 自定义线程池
    with ThreadPoolExecutor(max_workers=5) as thread_pool:
        do_sync_future = loop.run_in_executor(thread_pool, do_sync, 1)
    
    # 自定义进程池
    with ProcessPoolExecutor(max_workers=2) as process_pool:
        do_sync_future = loop.run_in_executor(process_pool, do_sync_cpu_bound, 1)
  2. 共享状态与线程安全: 如果提交给run_in_executor的同步函数需要访问或修改共享数据(如全局变量、类属性),则必须使用线程锁(threading.Lock)或其他同步原语来确保线程安全,以避免数据竞争问题。在示例中的do_sync函数,由于其内部操作是独立的,不涉及共享状态,因此无需外部加锁。

  3. 错误处理: run_in_executor返回的Future对象在被await时,如果其内部的同步函数抛出异常,该异常会被重新抛出。因此,可以使用try...except块来捕获和处理这些异常。

  4. 避免过度使用: run_in_executor引入了线程/进程管理的开销。对于非常轻量级的同步操作,直接在事件循环中执行可能比使用执行器更高效。它主要用于处理那些确实会阻塞事件循环的耗时同步任务。

总结

通过asyncio.run_in_executor,Python的asyncio框架能够优雅地与传统的同步阻塞代码以及多线程/多进程模型结合。它提供了一种强大的机制,允许开发者在保持事件循环响应性的同时,处理耗时的同步I/O或CPU密集型任务。掌握这一技术对于构建高性能、高并发的Python应用程序至关重要,它弥合了协程的非阻塞特性与传统同步操作之间的鸿沟,使得Python开发者能够充分利用现代多核处理器和异步编程范式。

相关文章

编程速学教程(入门课程)
编程速学教程(入门课程)

编程怎么学习?编程怎么入门?编程在哪学?编程怎么学才快?不用担心,这里为大家提供了编程速学教程(入门课程),有需要的小伙伴保存下载就能学习啦!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
全局变量怎么定义
全局变量怎么定义

本专题整合了全局变量相关内容,阅读专题下面的文章了解更多详细内容。

81

2025.09.18

python 全局变量
python 全局变量

本专题整合了python中全局变量定义相关教程,阅读专题下面的文章了解更多详细内容。

96

2025.09.18

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

503

2023.08.10

Python 多线程与异步编程实战
Python 多线程与异步编程实战

本专题系统讲解 Python 多线程与异步编程的核心概念与实战技巧,包括 threading 模块基础、线程同步机制、GIL 原理、asyncio 异步任务管理、协程与事件循环、任务调度与异常处理。通过实战示例,帮助学习者掌握 如何构建高性能、多任务并发的 Python 应用。

186

2025.12.24

java多线程相关教程合集
java多线程相关教程合集

本专题整合了java多线程相关教程,阅读专题下面的文章了解更多详细内容。

15

2026.01.21

C++多线程相关合集
C++多线程相关合集

本专题整合了C++多线程相关教程,阅读专题下面的的文章了解更多详细内容。

15

2026.01.21

Python 多线程与异步编程实战
Python 多线程与异步编程实战

本专题系统讲解 Python 多线程与异步编程的核心概念与实战技巧,包括 threading 模块基础、线程同步机制、GIL 原理、asyncio 异步任务管理、协程与事件循环、任务调度与异常处理。通过实战示例,帮助学习者掌握 如何构建高性能、多任务并发的 Python 应用。

186

2025.12.24

java多线程相关教程合集
java多线程相关教程合集

本专题整合了java多线程相关教程,阅读专题下面的文章了解更多详细内容。

15

2026.01.21

C++ 设计模式与软件架构
C++ 设计模式与软件架构

本专题深入讲解 C++ 中的常见设计模式与架构优化,包括单例模式、工厂模式、观察者模式、策略模式、命令模式等,结合实际案例展示如何在 C++ 项目中应用这些模式提升代码可维护性与扩展性。通过案例分析,帮助开发者掌握 如何运用设计模式构建高质量的软件架构,提升系统的灵活性与可扩展性。

0

2026.01.30

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.4万人学习

Django 教程
Django 教程

共28课时 | 3.7万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号