0

0

FastAPI启动事件中AsyncGenerator依赖注入的正确实践

DDD

DDD

发布时间:2025-09-29 20:47:23

|

961人浏览过

|

来源于php中文网

原创

FastAPI启动事件中AsyncGenerator依赖注入的正确实践

本文探讨了在FastAPI应用的startup事件中直接使用Depends()与AsyncGenerator进行资源(如Redis连接)初始化时遇到的问题,并指出Depends()不适用于此场景。核心内容是提供并详细解释了如何通过FastAPI的lifespan上下文管理器来正确、优雅地管理异步生成器依赖,确保应用启动时资源正确初始化,避免AttributeError。

问题背景:在应用启动时初始化异步资源

在构建基于fastapi的异步应用时,我们经常需要在应用启动时初始化一些全局资源,例如数据库连接池、消息队列客户端或缓存连接。fastapi提供了@app.on_event("startup")装饰器来处理这些启动任务。同时,为了更好地管理资源生命周期,我们通常会使用异步生成器(asyncgenerator)来创建和关闭这些资源,并结合fastapi的依赖注入系统depends()。然而,当尝试在startup事件中直接将asyncgenerator与depends()结合使用时,可能会遇到意料之外的错误。

考虑一个场景,我们需要在FastAPI应用启动时获取一个Redis异步客户端,并将其用于初始化一个全局的任务队列。我们可能尝试编写如下代码:

import uvicorn
from fastapi import FastAPI, Depends
import redis.asyncio as redis
from redis.asyncio import Redis
from typing import AsyncGenerator
from rq import Queue # 假设rq是任务队列库

# 配置Redis连接
REDIS_HOST = "localhost"
REDIS_PORT = 6379

redis_pool = redis.ConnectionPool.from_url(f"redis://{REDIS_HOST}:{REDIS_PORT}")

async def get_async_redis_client() -> AsyncGenerator[Redis, None]:
    """
    异步生成器,用于提供Redis客户端连接。
    """
    async with Redis.from_pool(redis_pool) as client:
        yield client

def process_data(data: str):
    """
    模拟一个处理数据的函数。
    """
    print(f"Processing data: {data}")

def create_app():
    app = FastAPI(docs_url='/')
    task_queue: Queue = None # 声明为None,稍后初始化

    @app.on_event("startup")
    async def startup_event(redis_conn: redis.asyncio.Redis = Depends(get_async_redis_client)):
        """
        尝试在startup事件中使用Depends()注入Redis连接。
        """
        nonlocal task_queue
        task_queue = Queue("task_queue", connection=redis_conn)
        print("Redis connection initialized in startup event.")

    @app.post("/add_data")
    async def add_data(data: str):
        """
        添加数据到任务队列。
        """
        if task_queue:
            task_queue.enqueue(process_data, data)
            return {"message": "Book in processing"}
        return {"message": "Task queue not initialized", "status": "error"}

    @app.get("/get_data")
    async def get_data():
        """
        示例接口。
        """
        return {"data": "kek"}

    return app

def main():
    uvicorn.run(
        f"{__name__}:create_app",
        host='0.0.0.0', port=8888,
        reload=True
    )

if __name__ == '__main__':
    main()

当运行上述代码并尝试向/add_data端点发送POST请求时,会收到一个AttributeError: 'Depends' object has no attribute 'pipeline'的错误。这表明在startup_event函数中,redis_conn变量并没有被解析成实际的redis.asyncio.Redis对象,而仍然是一个Depends对象。

理解FastAPI的依赖注入与启动事件

FastAPI的Depends()机制主要设计用于请求处理函数中的依赖解析。在请求处理的生命周期中,FastAPI会负责调用依赖函数(包括异步生成器),获取其yield出的值,并在请求结束后执行生成器中yield之后的清理代码。

然而,@app.on_event("startup")装饰器下的函数,其执行时机在整个应用开始接受请求之前,并且它不属于标准的请求-响应循环。FastAPI的依赖注入系统并不会像处理路由函数那样,自动解析startup事件函数参数中的Depends()。因此,redis_conn变量接收到的不是get_async_redis_client生成器yield出的Redis客户端实例,而是Depends(get_async_redis_client)这个Depends对象本身。当rq库尝试对这个Depends对象调用pipeline()方法时,自然会抛出AttributeError。

正确实践:利用lifespan管理异步生成器依赖

为了在应用启动时正确地初始化和管理异步生成器提供的资源,FastAPI推荐使用lifespan上下文管理器。lifespan是FastAPI 0.65.0版本引入的一种更现代、更灵活的应用生命周期管理方式,它允许我们定义一个异步上下文管理器,在应用启动前执行设置代码,并在应用关闭前执行清理代码。

MCP官网
MCP官网

Model Context Protocol(模型上下文协议)

下载

通过lifespan,我们可以手动调用异步生成器,获取其yield出的资源,并将其存储在应用实例或全局变量中,供其他部分使用。

以下是使用lifespan解决上述问题的正确方法:

import uvicorn
from fastapi import FastAPI
import redis.asyncio as redis
from redis.asyncio import Redis
from typing import AsyncGenerator
from rq import Queue # 假设rq是任务队列库
from contextlib import asynccontextmanager

# 配置Redis连接
REDIS_HOST = "localhost"
REDIS_PORT = 6379

redis_pool = redis.ConnectionPool.from_url(f"redis://{REDIS_HOST}:{REDIS_PORT}")

async def get_async_redis_client() -> AsyncGenerator[Redis, None]:
    """
    异步生成器,用于提供Redis客户端连接。
    """
    print("Opening Redis connection...")
    async with Redis.from_pool(redis_pool) as client:
        yield client
    print("Closing Redis connection...") # 应用关闭时执行

def process_data(data: str):
    """
    模拟一个处理数据的函数。
    """
    print(f"Processing data: {data}")

# 定义一个全局变量来存储任务队列
task_queue: Queue = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    FastAPI应用生命周期管理器。
    在应用启动时初始化资源,在应用关闭时清理资源。
    """
    global task_queue # 声明使用全局变量

    # 手动调用异步生成器以获取Redis连接
    # 注意:这里直接调用get_async_redis_client(),并迭代它
    # app.dependency_overrides.get(get_async_redis_client, get_async_redis_client)
    # 这一步是为了兼容可能存在的依赖覆盖,确保获取到的是最终的依赖函数
    redis_generator_func = app.dependency_overrides.get(get_async_redis_client, get_async_redis_client)
    async for redis_conn in redis_generator_func():
        # 在这里,redis_conn已经是实际的Redis客户端对象
        task_queue = Queue("task_queue", connection=redis_conn)
        print("Redis connection and Task Queue initialized via lifespan.")
        yield # 应用在此处启动并处理请求
    # 应用关闭时,生成器会继续执行,清理Redis连接
    print("Application shutdown: Resources released.")

def create_app():
    app = FastAPI(
        docs_url='/',
        lifespan=lifespan # 将lifespan上下文管理器传递给FastAPI
    )

    @app.post("/add_data")
    async def add_data(data: str):
        """
        添加数据到任务队列。
        """
        if task_queue:
            task_queue.enqueue(process_data, data)
            return {"message": "Book in processing"}
        return {"message": "Task queue not initialized", "status": "error"}

    @app.get("/get_data")
    async def get_data():
        """
        示例接口。
        """
        return {"data": "kek"}

    return app

def main():
    uvicorn.run(
        f"{__name__}:create_app",
        host='0.0.0.0', port=8888,
        reload=True
    )

if __name__ == '__main__':
    main()

在这个修正后的代码中:

  1. @asynccontextmanager装饰器: 我们使用contextlib.asynccontextmanager装饰器将lifespan函数转换为一个异步上下文管理器。
  2. lifespan函数: 这个函数现在负责整个应用的生命周期。
    • 在yield之前,它执行应用启动时的初始化逻辑。我们在这里手动调用get_async_redis_client()异步生成器,并通过async for循环获取yield出的redis_conn对象。
    • app.dependency_overrides.get(get_async_redis_client, get_async_redis_client)这一步是为了确保即使get_async_redis_client被覆盖(例如在测试环境中),lifespan也能获取到正确的依赖函数。
    • task_queue被正确地用解析后的redis_conn对象初始化。
    • yield语句将控制权交还给FastAPI,此时应用开始处理请求。
    • 当应用关闭时(例如通过发送中断信号),yield之后的代码会被执行,从而触发get_async_redis_client生成器中的清理逻辑(async with Redis.from_pool(redis_pool) as client:块的退出)。
  3. FastAPI(lifespan=lifespan): 在创建FastAPI应用实例时,通过lifespan参数注册我们定义的生命周期管理器。

注意事项

  • 全局变量管理: 在lifespan函数中修改全局变量(如task_queue)时,务必使用global关键字来指示您正在修改全局作用域的变量,而不是创建局部变量。
  • 依赖覆盖兼容性: app.dependency_overrides.get(get_async_redis_client, get_async_redis_client)是一个健壮的做法。它首先检查get_async_redis_client是否被app.dependency_overrides覆盖。如果没有,它就使用原始的get_async_redis_client函数。这对于测试和更复杂的应用场景非常有用。
  • 资源清理: 使用AsyncGenerator结合async with语句是管理异步资源生命周期的推荐方式。lifespan上下文管理器确保了AsyncGenerator的清理部分在应用关闭时被正确执行。
  • startup事件与lifespan: 尽管@app.on_event("startup")仍然可用,但对于需要复杂初始化和清理逻辑的资源,或者需要与依赖注入系统交互的场景,lifespan提供了更强大、更清晰的机制。

总结

在FastAPI中,Depends()装饰器是为请求处理函数设计的依赖注入机制,不适用于@app.on_event("startup")事件。当需要在应用启动时利用AsyncGenerator初始化全局资源时,正确的做法是使用FastAPI的lifespan上下文管理器。通过手动调用异步生成器并将其结果存储在全局变量中,我们可以确保资源在应用启动时被正确初始化,并在应用关闭时被优雅地清理,从而避免因依赖解析不当导致的AttributeError。这种方法不仅解决了特定问题,也体现了FastAPI在应用生命周期管理上的灵活性和强大功能。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
Python FastAPI异步API开发_Python怎么用FastAPI构建异步API
Python FastAPI异步API开发_Python怎么用FastAPI构建异步API

Python FastAPI 异步开发利用 async/await 关键字,通过定义异步视图函数、使用异步数据库库 (如 databases)、异步 HTTP 客户端 (如 httpx),并结合后台任务队列(如 Celery)和异步依赖项,实现高效的 I/O 密集型 API,显著提升吞吐量和响应速度,尤其适用于处理数据库查询、网络请求等耗时操作,无需阻塞主线程。

27

2025.12.22

全局变量怎么定义
全局变量怎么定义

本专题整合了全局变量相关内容,阅读专题下面的文章了解更多详细内容。

78

2025.09.18

python 全局变量
python 全局变量

本专题整合了python中全局变量定义相关教程,阅读专题下面的文章了解更多详细内容。

96

2025.09.18

常用的数据库软件
常用的数据库软件

常用的数据库软件有MySQL、Oracle、SQL Server、PostgreSQL、MongoDB、Redis、Cassandra、Hadoop、Spark和Amazon DynamoDB。更多关于数据库软件的内容详情请看本专题下面的文章。php中文网欢迎大家前来学习。

978

2023.11.02

内存数据库有哪些
内存数据库有哪些

内存数据库有Redis、Memcached、Apache Ignite、VoltDB、TimesTen、H2 Database、Aerospike、Oracle TimesTen In-Memory Database、SAP HANA和ache Cassandra。更多关于内存数据库相关问题,详情请看本专题下面的文章。php中文网欢迎大家前来学习。

634

2023.11.14

mongodb和redis哪个读取速度快
mongodb和redis哪个读取速度快

redis 的读取速度比 mongodb 更快。原因包括:1. redis 使用简单的键值存储,而 mongodb 存储 json 格式的数据,需要解析和反序列化。2. redis 使用哈希表快速查找数据,而 mongodb 使用 b-tree 索引。因此,redis 在需要高性能读取操作的应用程序中是一个更好的选择。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

485

2024.04.02

redis怎么做缓存服务器
redis怎么做缓存服务器

redis 作为缓存服务器的答案:redis 是一款开源、高性能、分布式的键值存储,可作为缓存服务器使用。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

399

2024.04.07

redis怎么解决数据一致性
redis怎么解决数据一致性

redis 提供了两种一致性模型,以维护副本数据一致性:强一致性 (sync) 确保写操作仅在复制到所有从节点后才完成;最终一致性 (async) 则在主节点上写操作后认为已完成,牺牲一致性换取性能。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

393

2024.04.07

Python 自然语言处理(NLP)基础与实战
Python 自然语言处理(NLP)基础与实战

本专题系统讲解 Python 在自然语言处理(NLP)领域的基础方法与实战应用,涵盖文本预处理(分词、去停用词)、词性标注、命名实体识别、关键词提取、情感分析,以及常用 NLP 库(NLTK、spaCy)的核心用法。通过真实文本案例,帮助学习者掌握 使用 Python 进行文本分析与语言数据处理的完整流程,适用于内容分析、舆情监测与智能文本应用场景。

0

2026.01.27

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
进程与SOCKET
进程与SOCKET

共6课时 | 0.4万人学习

Redis+MySQL数据库面试教程
Redis+MySQL数据库面试教程

共72课时 | 6.5万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号