0

0

FastAPI WebSocket 中正确执行异步后台任务的完整指南

聖光之護

聖光之護

发布时间:2026-03-17 12:16:03

|

550人浏览过

|

来源于php中文网

原创

在 FastAPI 的 WebSocket 连接中,BackgroundTasks 不可用;必须改用 asyncio.create_task() 启动真正的异步后台协程,才能确保外部服务调用与 Redis 订阅逻辑并行执行。

在 fastapi 的 websocket 连接中,`backgroundtasks` 不可用;必须改用 `asyncio.create_task()` 启动真正的异步后台协程,才能确保外部服务调用与 redis 订阅逻辑并行执行。

WebSocket 是长连接、双向持续通信协议,而 FastAPI 的 BackgroundTasks 机制专为 HTTP 请求生命周期设计:它依赖 Request 对象,在响应返回客户端后立即触发,并在应用层自动管理任务生命周期(如异常捕获、作用域清理)。但 WebSocket 连接一旦建立,就不再有“请求完成”这一语义——连接长期保持,BackgroundTasks 因缺少上下文而无法注册或执行,这也是你调用 background_task.add_task(...) 后函数静默失效的根本原因。

要实现在 WebSocket 接入时立即触发外部服务调用(如通知下游系统向 Redis 发布消息),同时持续监听 Redis Pub/Sub 通道并将结果实时推送给客户端,应采用原生 asyncio 任务调度方式:

推荐方案:使用 asyncio.create_task()
该方法将协程直接提交至事件循环,不依赖任何请求上下文,完全适配 WebSocket 场景。注意:需手动处理异常(否则后台任务崩溃将无声失败),建议包裹 try/except 并记录日志。

以下是修正后的完整示例(含关键注释与健壮性增强):

import asyncio
import logging
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from redis import asyncio as aioredis
import aiohttp

app = FastAPI()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# ✅ 全局复用 HTTP 客户端(避免每次新建 session,提升性能与连接复用)
HTTP_CLIENT = None

@app.on_event("startup")
async def startup_event():
    global HTTP_CLIENT
    HTTP_CLIENT = aiohttp.ClientSession()

@app.on_event("shutdown")
async def shutdown_event():
    if HTTP_CLIENT:
        await HTTP_CLIENT.close()

async def call_external_server(channel: str, text: str):
    """异步调用外部服务,触发其向 Redis 发布数据"""
    try:
        logger.info(f"Triggering external server for channel: {channel}, text: {text}")
        async with HTTP_CLIENT.get(
            f"http://localhost:9000/pub?channel={channel}&text={text}"
        ) as resp:
            logger.debug(f"External server response: {resp.status}")
        logger.info("External server notified successfully")
    except Exception as e:
        logger.error(f"Failed to notify external server: {e}")

STOPWORD = "STOP"

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    # 从 headers 或 query 获取参数(示例中假设通过 header 传递)
    headers = dict(websocket.headers)
    channel = headers.get("x-channel", "default")
    text = headers.get("x-text", "")

    # 初始化 Redis Pub/Sub
    redis_client = await aioredis.from_url("redis://localhost")
    pubsub = redis_client.pubsub()
    await pubsub.subscribe(channel)
    logger.info(f"Subscribed to Redis channel: {channel}")

    # ✅ 正确启动后台任务:使用 asyncio.create_task
    # 注意:此处不 await,否则会阻塞 WebSocket 主循环
    asyncio.create_task(call_external_server(channel, text))

    try:
        while True:
            # 非阻塞获取消息,超时 1 秒避免永久挂起
            message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
            if message is not None:
                if isinstance(message["data"], bytes):
                    decoded_msg = message["data"].decode("utf-8")
                    if decoded_msg == STOPWORD:
                        logger.info("(Reader) Received STOP signal")
                        break
                    await websocket.send_text(decoded_msg)
    except WebSocketDisconnect:
        logger.info("Client disconnected")
    except Exception as e:
        logger.error(f"WebSocket error: {e}")
    finally:
        await pubsub.unsubscribe(channel)
        await redis_client.close()
        await websocket.close()

? 关键注意事项:

皮卡智能
皮卡智能

AI驱动高效视觉设计平台

下载
  • 绝不 await 后台任务:asyncio.create_task() 返回 Task 对象,直接调用即可并发执行;若 await 它,将退化为串行,失去后台意义。
  • 异常必须显式捕获:create_task 启动的任务若抛出未捕获异常,会静默终止且不中断主流程,务必在任务内部 try/except 并记录日志。
  • 资源复用至关重要:如示例所示,aiohttp.ClientSession 和 aioredis.Redis 应在应用生命周期内复用(startup/shutdown 事件管理),避免高频创建连接导致资源耗尽。
  • Redis Pub/Sub 需主动清理:WebSocket 断开时务必 unsubscribe 并关闭 client,防止内存泄漏和订阅堆积。

? 进阶提示:若需更严格的后台任务生命周期管理(如取消、等待完成、错误传播),可考虑封装为 asyncio.TaskGroup(Python 3.11+)或结合 asyncio.shield() + asyncio.wait_for() 实现超时控制与取消联动。

掌握这一模式,你就能在 FastAPI WebSocket 场景中可靠地解耦“触发动作”与“监听响应”,构建真正响应式、高并发的实时通信服务。

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
Python FastAPI异步API开发_Python怎么用FastAPI构建异步API
Python FastAPI异步API开发_Python怎么用FastAPI构建异步API

Python FastAPI 异步开发利用 async/await 关键字,通过定义异步视图函数、使用异步数据库库 (如 databases)、异步 HTTP 客户端 (如 httpx),并结合后台任务队列(如 Celery)和异步依赖项,实现高效的 I/O 密集型 API,显著提升吞吐量和响应速度,尤其适用于处理数据库查询、网络请求等耗时操作,无需阻塞主线程。

29

2025.12.22

Python 微服务架构与 FastAPI 框架
Python 微服务架构与 FastAPI 框架

本专题系统讲解 Python 微服务架构设计与 FastAPI 框架应用,涵盖 FastAPI 的快速开发、路由与依赖注入、数据模型验证、API 文档自动生成、OAuth2 与 JWT 身份验证、异步支持、部署与扩展等。通过实际案例,帮助学习者掌握 使用 FastAPI 构建高效、可扩展的微服务应用,提高服务响应速度与系统可维护性。

253

2026.02.06

堆和栈的区别
堆和栈的区别

堆和栈的区别:1、内存分配方式不同;2、大小不同;3、数据访问方式不同;4、数据的生命周期。本专题为大家提供堆和栈的区别的相关的文章、下载、课程内容,供大家免费下载体验。

448

2023.07.18

堆和栈区别
堆和栈区别

堆(Heap)和栈(Stack)是计算机中两种常见的内存分配机制。它们在内存管理的方式、分配方式以及使用场景上有很大的区别。本文将详细介绍堆和栈的特点、区别以及各自的使用场景。php中文网给大家带来了相关的教程以及文章欢迎大家前来学习阅读。

606

2023.08.10

常用的数据库软件
常用的数据库软件

常用的数据库软件有MySQL、Oracle、SQL Server、PostgreSQL、MongoDB、Redis、Cassandra、Hadoop、Spark和Amazon DynamoDB。更多关于数据库软件的内容详情请看本专题下面的文章。php中文网欢迎大家前来学习。

1010

2023.11.02

内存数据库有哪些
内存数据库有哪些

内存数据库有Redis、Memcached、Apache Ignite、VoltDB、TimesTen、H2 Database、Aerospike、Oracle TimesTen In-Memory Database、SAP HANA和ache Cassandra。更多关于内存数据库相关问题,详情请看本专题下面的文章。php中文网欢迎大家前来学习。

675

2023.11.14

mongodb和redis哪个读取速度快
mongodb和redis哪个读取速度快

redis 的读取速度比 mongodb 更快。原因包括:1. redis 使用简单的键值存储,而 mongodb 存储 json 格式的数据,需要解析和反序列化。2. redis 使用哈希表快速查找数据,而 mongodb 使用 b-tree 索引。因此,redis 在需要高性能读取操作的应用程序中是一个更好的选择。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

501

2024.04.02

redis怎么做缓存服务器
redis怎么做缓存服务器

redis 作为缓存服务器的答案:redis 是一款开源、高性能、分布式的键值存储,可作为缓存服务器使用。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

416

2024.04.07

c++ 字符处理
c++ 字符处理

本专题整合了c++字符处理教程、字符串处理函数相关内容,阅读专题下面的文章了解更多详细内容。

0

2026.03.17

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号