0

0

优化asyncio中嵌套异步任务的并发调度

聖光之護

聖光之護

发布时间:2025-11-27 11:37:19

|

491人浏览过

|

来源于php中文网

原创

优化asyncio中嵌套异步任务的并发调度

本文探讨了在`asyncio`中处理嵌套异步生成器时,如何通过传统`await`模式导致的串行执行问题。针对`await`的阻塞特性,文章提出并详细阐述了利用`asyncio.queue`和`asyncio.event`构建生产者-消费者模式的解决方案,从而实现任务间的解耦和真正的并发执行,显著提升异步应用的效率和响应性。

理解asyncio中的await与并发限制

在asyncio编程中,await关键字是调度协程的核心机制。当一个协程遇到await表达式时,它会暂停自身的执行,将控制权交还给事件循环,并等待被await的协程完成。一旦被await的协程完成并返回结果,原协程才会从暂停点继续执行。这种机制虽然实现了协作式多任务,但如果设计不当,也可能导致非预期的串行执行。

考虑以下场景:一个异步任务(main)需要从一个异步生成器(sentences_generator)获取数据,然后将数据传递给另一个异步任务(process_sentence)进行处理。如果main函数在每次获取到数据后,都直接await process_sentence的完成,那么在process_sentence执行期间,sentences_generator将无法继续生成新的数据。这违背了我们期望的并发处理,即当process_sentence在处理当前数据时,sentences_generator应该能够同时准备下一批数据。

以下是原始代码示例及其输出,展示了这种串行阻塞行为:

import asyncio

async def stream():
    char_string = "Hi. Hello. Hello."
    for char in char_string:
        await asyncio.sleep(0.1)  # 模拟耗时操作
        print("got char:", char)
        yield char

async def sentences_generator():
    sentence = ""
    async for char in stream():
        sentence += char
        if char in [".", "!", "?"]:
            print("got sentence: ", sentence)
            yield sentence
            sentence = ""

async def process_sentence(sentence: str):
    print("waiting for processing sentence: ", sentence)
    await asyncio.sleep(len(sentence)*0.1) # 模拟耗时处理
    print("sentence processed!")

async def main():
    i=0
    async for sentence in sentences_generator():
        print("processing sentence: ", i)
        await process_sentence(sentence) # 这里的await导致阻塞
        i += 1

# asyncio.run(main())

原始输出示例:

got char: H
got char: i
got char: .
got sentence:  Hi.
processing sentence:  0
waiting for processing sentence:  Hi.
sentence processed!
got char:  
got char: H
got char: e
got char: y
got char: .
got sentence:   Hey.
processing sentence:  1
waiting for processing sentence:   Hey.
sentence processed!
...

从输出可以看出,只有当process_sentence完全处理完一个句子后,stream和sentences_generator才能继续生成下一个字符和句子。这并不是我们期望的并发效果。

解决方案:使用asyncio.Queue实现生产者-消费者模式

为了实现真正的并发,我们需要解耦数据的生产和消费过程,使它们能够独立运行。asyncio.Queue是实现这种生产者-消费者模式的理想工具

核心思想:

BlackBox AI
BlackBox AI

AI编程助手,智能对话问答助手

下载
  1. 生产者(Producer):一个或多个异步任务负责生成数据,并将数据放入asyncio.Queue中。
  2. 消费者(Consumer):一个或多个异步任务从asyncio.Queue中取出数据进行处理。
  3. 独立运行:生产者和消费者作为独立的协程,由asyncio事件循环调度,它们之间通过队列进行通信,互不阻塞。

此外,为了实现优雅的关闭和通知消费者数据已全部生产完毕,我们可以引入asyncio.Event。生产者在完成所有数据生产后设置Event,消费者则可以结合队列是否为空和Event状态来判断何时停止。

优化后的代码实现

我们将修改sentences_generator作为生产者,将生成的句子放入队列;process_sentence作为消费者,从队列中取出句子进行处理。main函数将负责启动这两个独立的协程。

import asyncio

# 定义全局变量用于计数,方便观察
i = 1

async def stream():
    char_string = "Hi. Hello. Thank you." # 增加一些内容以更好地展示并发
    for char in char_string:
        await asyncio.sleep(0.1) # 模拟耗时操作
        print("got char:", char)
        yield char

async def sentences_generator(q: asyncio.Queue[str], flag: asyncio.Event):
    """
    生产者协程:从字符流生成句子,并放入队列。
    当所有句子生成完毕后,设置flag通知消费者。
    """
    sentence = ""
    async for char in stream():
        sentence += char
        if char in [".", "!", "?"]:
            print("got sentence: ", sentence)
            await q.put(sentence) # 将生成的句子放入队列
            sentence = ""
    # 确保最后一个不以标点符号结尾的句子也被处理(如果需要)
    if sentence:
        print("got sentence: ", sentence)
        await q.put(sentence)
    flag.set() # 生产完毕,设置事件标志

async def process_sentence(q: asyncio.Queue[str], flag: asyncio.Event):
    """
    消费者协程:从队列中获取句子并进行处理。
    当队列为空且生产者已设置flag时,停止消费。
    """
    global i
    while True:
        # 检查是否应该停止:队列为空且生产者已完成
        if q.empty() and flag.is_set():
            break

        # 尝试从队列获取项目,如果队列为空则等待
        item = await q.get()

        print("processing sentence: ", i)
        print("waiting for processing sentence: ", item)
        await asyncio.sleep(len(item) * 0.1) # 模拟耗时处理
        print("sentence processed!")

        q.task_done() # 通知队列此任务已完成
        i += 1

async def main():
    global i
    i = 1 # 重置计数器
    event = asyncio.Event() # 用于生产者通知消费者结束
    queue = asyncio.Queue[str]() # 生产者和消费者之间的通信队列

    # 启动生产者和消费者作为独立的协程任务
    producer_task = asyncio.create_task(sentences_generator(queue, event))
    consumer_task = asyncio.create_task(process_sentence(queue, event))

    # 等待所有任务完成
    await asyncio.gather(producer_task, consumer_task)

    # 可选:等待队列中所有任务被标记为完成,确保所有数据都被处理
    await queue.join()

asyncio.run(main())

预期输出示例:

got char: H
got char: i
got char: .
got sentence:  Hi.
got char:  
got char: H
processing sentence:  1
waiting for processing sentence:  Hi.
got char: e
got char: l
got char: l
got char: o
got char: .
got sentence:   Hello.
sentence processed!
got char:  
got char: T
processing sentence:  2
waiting for processing sentence:   Hello.
got char: h
got char: a
got char: n
got char: k
got char:  
got char: y
got char: o
got char: u
got char: .
got sentence:  Thank you.
sentence processed!
processing sentence:  3
waiting for processing sentence:  Thank you.
sentence processed!

从这个输出可以看出,当process_sentence正在处理第一个句子时,stream和sentences_generator已经继续生成了后续的字符和句子,并将其放入队列。这正是我们期望的并发行为。

关键点和注意事项

  1. asyncio.Queue的作用:它提供了一个线程安全的(在asyncio中是协程安全的)FIFO队列。put()操作在队列满时会暂停,get()操作在队列空时会暂停,直到有新的数据可用。
  2. asyncio.Event的作用:它是一个简单的同步原语,用于一个协程通知另一个协程某个事件已经发生。生产者在完成所有数据生产后调用flag.set(),消费者则通过flag.is_set()来检查生产者的状态。
  3. asyncio.gather():用于并发运行多个协程或任务,并等待它们全部完成。
  4. q.task_done() 和 q.join()
    • q.task_done():消费者在完成对从队列中获取的项目的处理后调用,通知队列该项目已处理完毕。
    • q.join():main函数可以调用await queue.join()来等待队列中所有项目都被get并task_done。这确保了在程序退出前所有数据都已得到处理。在我们的示例中,虽然gather已经等待了所有协程,但queue.join()提供了一个更明确的机制来等待所有队列中的工作完成。
  5. 消费者退出条件:消费者协程的退出逻辑至关重要。一个常见的模式是while True循环,内部判断q.empty() and flag.is_set()来决定是否退出。这确保了在生产者完成且队列中所有待处理项都已消费后,消费者才能安全退出。
  6. 错误处理:在实际应用中,生产者和消费者内部应添加适当的错误处理机制,例如try-except块。
  7. 背压(Backpressure):asyncio.Queue可以有容量限制。如果生产者生产速度远快于消费者,队列会逐渐填满,最终q.put()会暂停,从而对生产者施加背压,防止内存无限增长。

总结

通过将异步任务分解为独立的生产者和消费者,并利用asyncio.Queue进行通信,我们成功地将原本串行执行的逻辑转换为了并发执行。这种模式不仅提高了资源利用率,也使得代码结构更加清晰,易于维护和扩展。在设计复杂的asyncio应用时,当存在数据流动的依赖但又希望实现任务并行时,生产者-消费者模式与asyncio.Queue是解决这类问题的强大工具。

相关专题

更多
while的用法
while的用法

while的用法是“while 条件: 代码块”,条件是一个表达式,当条件为真时,执行代码块,然后再次判断条件是否为真,如果为真则继续执行代码块,直到条件为假为止。本专题为大家提供while相关的文章、下载、课程内容,供大家免费下载体验。

85

2023.09.25

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

480

2023.08.10

Golang gRPC 服务开发与Protobuf实战
Golang gRPC 服务开发与Protobuf实战

本专题系统讲解 Golang 在 gRPC 服务开发中的完整实践,涵盖 Protobuf 定义与代码生成、gRPC 服务端与客户端实现、流式 RPC(Unary/Server/Client/Bidirectional)、错误处理、拦截器、中间件以及与 HTTP/REST 的对接方案。通过实际案例,帮助学习者掌握 使用 Go 构建高性能、强类型、可扩展的 RPC 服务体系,适用于微服务与内部系统通信场景。

8

2026.01.15

公务员递补名单公布时间 公务员递补要求
公务员递补名单公布时间 公务员递补要求

公务员递补名单公布时间不固定,通常在面试前,由招录单位(如国家知识产权局、海关等)发布,依据是原入围考生放弃资格,会按笔试成绩从高到低递补,递补考生需按公告要求限时确认并提交材料,及时参加面试/体检等后续环节。要求核心是按招录单位公告及时响应、提交材料(确认书、资格复审材料)并准时参加面试。

38

2026.01.15

公务员调剂条件 2026调剂公告时间
公务员调剂条件 2026调剂公告时间

(一)符合拟调剂职位所要求的资格条件。 (二)公共科目笔试成绩同时达到拟调剂职位和原报考职位的合格分数线,且考试类别相同。 拟调剂职位设置了专业科目笔试条件的,专业科目笔试成绩还须同时达到合格分数线,且考试类别相同。 (三)未进入原报考职位面试人员名单。

52

2026.01.15

国考成绩查询入口 国考分数公布时间2026
国考成绩查询入口 国考分数公布时间2026

笔试成绩查询入口已开通,考生可登录国家公务员局中央机关及其直属机构2026年度考试录用公务员专题网站http://bm.scs.gov.cn/pp/gkweb/core/web/ui/business/examResult/written_result.html,查询笔试成绩和合格分数线,点击“笔试成绩查询”按钮,凭借身份证及准考证进行查询。

10

2026.01.15

Java 桌面应用开发(JavaFX 实战)
Java 桌面应用开发(JavaFX 实战)

本专题系统讲解 Java 在桌面应用开发领域的实战应用,重点围绕 JavaFX 框架,涵盖界面布局、控件使用、事件处理、FXML、样式美化(CSS)、多线程与UI响应优化,以及桌面应用的打包与发布。通过完整示例项目,帮助学习者掌握 使用 Java 构建现代化、跨平台桌面应用程序的核心能力。

65

2026.01.14

php与html混编教程大全
php与html混编教程大全

本专题整合了php和html混编相关教程,阅读专题下面的文章了解更多详细内容。

36

2026.01.13

PHP 高性能
PHP 高性能

本专题整合了PHP高性能相关教程大全,阅读专题下面的文章了解更多详细内容。

75

2026.01.13

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Go 教程
Go 教程

共32课时 | 3.8万人学习

Go语言实战之 GraphQL
Go语言实战之 GraphQL

共10课时 | 0.8万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号