0

0

掌握 Python 并发编程:利用先进技术提升性能

碧海醫心

碧海醫心

发布时间:2024-12-08 20:21:17

|

1089人浏览过

|

来源于dev.to

转载

掌握 python 并发编程:利用先进技术提升性能

python 的并发编程能力已经显着发展,为开发人员提供了编写高效、并行代码的强大工具。我花了相当多的时间探索这些先进技术,很高兴与您分享我的见解。

使用 asyncio 进行异步编程是 i/o 密集型任务的游戏规则改变者。它允许我们编写非阻塞代码,可以同时处理多个操作,而无需线程开销。下面是一个简单的示例,说明如何使用 asyncio 同时从多个 url 获取数据:

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = ['http://example.com', 'http://example.org', 'http://example.net']
    async with aiohttp.clientsession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for url, result in zip(urls, results):
            print(f"content length of {url}: {len(result)}")

asyncio.run(main())

这段代码演示了我们如何创建多个协程来同时从不同的 url 获取数据。 asyncio.gather() 函数允许我们等待所有协程完成并收集它们的结果。

虽然 asyncio 非常适合 i/o 密集型任务,但它不适合 cpu 密集型操作。为此,我们转向concurrent.futures模块,它提供了threadpoolexecutor和processpoolexecutor。 threadpoolexecutor 非常适合不释放 gil 的 i/o 密集型任务,而 processpoolexecutor 非常适合 cpu 密集型任务。

下面是使用 threadpoolexecutor 并发下载多个文件的示例:

import concurrent.futures
import requests

def download_file(url):
    response = requests.get(url)
    filename = url.split('/')[-1]
    with open(filename, 'wb') as f:
        f.write(response.content)
    return f"downloaded {filename}"

urls = [
    'https://example.com/file1.pdf',
    'https://example.com/file2.pdf',
    'https://example.com/file3.pdf'
]

with concurrent.futures.threadpoolexecutor(max_workers=3) as executor:
    future_to_url = {executor.submit(download_file, url): url for url in urls}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except exception as exc:
            print(f"{url} generated an exception: {exc}")
        else:
            print(data)

此代码创建一个包含三个工作线程的线程池,并为每个 url 提交一个下载任务。 as_completed() 函数允许我们在结果可用时对其进行处理,而不是等待所有任务完成。

立即学习Python免费学习笔记(深入)”;

对于 cpu 密集型任务,我们可以使用 processpoolexecutor 来利用多个 cpu 核心。这是并行计算素数的示例:

import concurrent.futures
import math

def is_prime(n):
    if n < 2:
        return false
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0:
            return false
    return true

def find_primes(start, end):
    return [n for n in range(start, end) if is_prime(n)]

ranges = [(1, 25000), (25001, 50000), (50001, 75000), (75001, 100000)]

with concurrent.futures.processpoolexecutor() as executor:
    results = executor.map(lambda r: find_primes(*r), ranges)

all_primes = [prime for sublist in results for prime in sublist]
print(f"found {len(all_primes)} prime numbers")

此代码将查找素数的任务分为四个范围,并使用单独的 python 进程并行处理它们。 map() 函数将 find_primes() 函数应用于每个范围并收集结果。

当使用多个进程时,我们经常需要在它们之间共享数据。多处理模块为此提供了多种选项,包括共享内存和队列。这是使用共享内存数组的示例:

from multiprocessing import process, array
import numpy as np

def worker(shared_array, start, end):
    for i in range(start, end):
        shared_array[i] = i * i

if __name__ == '__main__':
    size = 10000000
    shared_array = array('d', size)

    # create 4 processes
    processes = []
    chunk_size = size // 4
    for i in range(4):
        start = i * chunk_size
        end = start + chunk_size if i < 3 else size
        p = process(target=worker, args=(shared_array, start, end))
        processes.append(p)
        p.start()

    # wait for all processes to finish
    for p in processes:
        p.join()

    # convert shared array to numpy array for easy manipulation
    np_array = np.frombuffer(shared_array.get_obj())
    print(f"sum of squares: {np_array.sum()}")

此代码创建一个共享内存数组,并使用四个进程并行计算数字的平方。共享数组允许所有进程写入相同的内存空间,避免了进程间通信的需要。

虽然这些技术很强大,但它们也面临着一系列挑战。竞争条件、死锁和过多的上下文切换都会影响性能和正确性。仔细设计并发代码并在必要时使用适当的同步原语至关重要。

例如,当多个线程或进程需要访问共享资源时,我们可以使用lock来保证线程安全:

from threading import lock, thread

class counter:
    def __init__(self):
        self.count = 0
        self.lock = lock()

    def increment(self):
        with self.lock:
            self.count += 1

def worker(counter, num_increments):
    for _ in range(num_increments):
        counter.increment()

counter = counter()
threads = []
for _ in range(10):
    t = thread(target=worker, args=(counter, 100000))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print(f"final count: {counter.count}")

此代码演示了当多个线程同时递增共享计数器时,如何使用锁来保护共享计数器免受竞争条件的影响。

PHPCMS V9
PHPCMS V9

PHPCMS V9(后面简称V9)采用PHP5+MYSQL做为技术基础进行开发。V9采用OOP(面向对象编程)+ MVC设计模式,进行基础运行框架搭建。模块化开发方式做为功能开发形式。框架易于功能扩展,代码维护,优秀的二次开发能力,可满足所有网站的应用需求。 5年开发经验的优秀团队,在掌握了丰富的WEB开发经验和CMS产品开发经验的同时,勇于创新追求完美的设计理念,为全球多达10万网站提供助力,并

下载

另一种先进技术是使用信号量来控制对有限资源的访问。下面是限制并发网络连接数的示例:

import asyncio
import aiohttp
from asyncio import semaphore

async def fetch_url(url, semaphore):
    async with semaphore:
        async with aiohttp.clientsession() as session:
            async with session.get(url) as response:
                return await response.text()

async def main():
    urls = [f'http://example.com/{i}' for i in range(100)]
    semaphore = semaphore(10)  # limit to 10 concurrent connections
    tasks = [fetch_url(url, semaphore) for url in urls]
    results = await asyncio.gather(*tasks)
    print(f"fetched {len(results)} urls")

asyncio.run(main())

此代码使用信号量将并发网络连接数限制为 10,防止网络或服务器不堪重负。

使用并发代码时,正确处理异常也很重要。 asyncio 模块为 asyncio.gather() 函数提供了一个 return_exceptions 参数,该参数对此很有用:

import asyncio

async def risky_operation(i):
    if i % 2 == 0:
        raise valueerror(f"even number not allowed: {i}")
    await asyncio.sleep(1)
    return i

async def main():
    tasks = [risky_operation(i) for i in range(10)]
    results = await asyncio.gather(*tasks, return_exceptions=true)
    for result in results:
        if isinstance(result, exception):
            print(f"got an exception: {result}")
        else:
            print(f"got a result: {result}")

asyncio.run(main())

此代码演示了如何在不停止其他任务执行的情况下处理并发任务中的异常。

随着我们深入研究并发编程,我们会遇到更高级的概念,例如事件循环和协程链。这是一个演示如何链接协程的示例:

import asyncio

async def fetch_data(url):
    print(f"fetching data from {url}")
    await asyncio.sleep(2)  # simulate network delay
    return f"data from {url}"

async def process_data(data):
    print(f"processing {data}")
    await asyncio.sleep(1)  # simulate processing time
    return f"processed {data}"

async def save_result(result):
    print(f"saving {result}")
    await asyncio.sleep(0.5)  # simulate saving delay
    return f"saved {result}"

async def fetch_process_save(url):
    data = await fetch_data(url)
    processed = await process_data(data)
    return await save_result(processed)

async def main():
    urls = ['http://example.com', 'http://example.org', 'http://example.net']
    tasks = [fetch_process_save(url) for url in urls]
    results = await asyncio.gather(*tasks)
    for result in results:
        print(result)

asyncio.run(main())

此代码链接了三个协程(fetch_data、process_data 和 save_result),为每个 url 创建一个管道。然后 asyncio.gather() 函数同时运行这些管道。

在处理长时间运行的任务时,通常需要实现取消和超时机制。这是一个演示两者的示例:

import asyncio

async def long_running_task(n):
    print(f"Starting long task {n}")
    try:
        await asyncio.sleep(10)
        print(f"Task {n} completed")
        return n
    except asyncio.CancelledError:
        print(f"Task {n} was cancelled")
        raise

async def main():
    tasks = [long_running_task(i) for i in range(5)]
    try:
        results = await asyncio.wait_for(asyncio.gather(*tasks), timeout=5)
    except asyncio.TimeoutError:
        print("Operation timed out, cancelling remaining tasks")
        for task in tasks:
            task.cancel()
        # Wait for all tasks to finish (they'll raise CancelledError)
        await asyncio.gather(*tasks, return_exceptions=True)
    else:
        print(f"All tasks completed successfully: {results}")

asyncio.run(main())

此代码启动五个长时间运行的任务,但设置所有任务的超时时间为 5 秒才能完成。如果达到超时,则会取消所有剩余任务。

总之,python 的并发编程功能为编写高效的并行代码提供了广泛的工具和技术。从使用 asyncio 的异步编程到 cpu 密集型任务的多处理,这些先进技术可以显着提高应用程序的性能。然而,了解基本概念、为每项任务选择正确的工具以及仔细管理共享资源和潜在的竞争条件至关重要。通过实践和精心设计,我们可以利用 python 中并发编程的全部功能来构建快速、可扩展且响应迅速的应用程序。


我们的创作

一定要看看我们的创作:

投资者中心 | 智能生活 | 时代与回声 | 令人费解的谜团 | 印度教 | 精英开发 | js学校


我们在媒体上

科技考拉洞察 | 时代与回响世界 | 投资者中央媒体 | 令人费解的谜团 | 科学与时代媒介 | 现代印度教

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

523

2023.08.10

golang map内存释放
golang map内存释放

本专题整合了golang map内存相关教程,阅读专题下面的文章了解更多相关内容。

75

2025.09.05

golang map相关教程
golang map相关教程

本专题整合了golang map相关教程,阅读专题下面的文章了解更多详细内容。

36

2025.11.16

golang map原理
golang map原理

本专题整合了golang map相关内容,阅读专题下面的文章了解更多详细内容。

61

2025.11.17

java判断map相关教程
java判断map相关教程

本专题整合了java判断map相关教程,阅读专题下面的文章了解更多详细内容。

42

2025.11.27

js正则表达式
js正则表达式

php中文网为大家提供各种js正则表达式语法大全以及各种js正则表达式使用的方法,还有更多js正则表达式的相关文章、相关下载、相关课程,供大家免费下载体验。

515

2023.06.20

js获取当前时间
js获取当前时间

JS全称JavaScript,是一种具有函数优先的轻量级,解释型或即时编译型的编程语言;它是一种属于网络的高级脚本语言,主要用于Web,常用来为网页添加各式各样的动态功能。js怎么获取当前时间呢?php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

244

2023.07.28

js 字符串转数组
js 字符串转数组

js字符串转数组的方法:1、使用“split()”方法;2、使用“Array.from()”方法;3、使用for循环遍历;4、使用“Array.split()”方法。本专题为大家提供js字符串转数组的相关的文章、下载、课程内容,供大家免费下载体验。

320

2023.08.03

C++ 设计模式与软件架构
C++ 设计模式与软件架构

本专题深入讲解 C++ 中的常见设计模式与架构优化,包括单例模式、工厂模式、观察者模式、策略模式、命令模式等,结合实际案例展示如何在 C++ 项目中应用这些模式提升代码可维护性与扩展性。通过案例分析,帮助开发者掌握 如何运用设计模式构建高质量的软件架构,提升系统的灵活性与可扩展性。

14

2026.01.30

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.4万人学习

Django 教程
Django 教程

共28课时 | 3.6万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号