0

0

如何在 FastAPI 的 WebSocket 中正确执行异步后台任务

碧海醫心

碧海醫心

发布时间:2026-03-17 11:40:08

|

761人浏览过

|

来源于php中文网

原创

如何在 FastAPI 的 WebSocket 中正确执行异步后台任务

FastAPI 的 BackgroundTasks 不适用于 WebSocket 场景,因其依赖 HTTP 请求生命周期;应改用 asyncio.create_task() 启动真正的异步后台协程,并注意复用 HTTP 客户端以提升性能。

fastapi 的 `backgroundtasks` 不适用于 websocket 场景,因其依赖 http 请求生命周期;应改用 `asyncio.create_task()` 启动真正的异步后台协程,并注意复用 http 客户端以提升性能。

在 FastAPI 中,BackgroundTasks 是专为 HTTP 请求响应周期 设计的机制:它在请求返回客户端后、但响应尚未完全关闭前执行任务。然而,WebSocket 是一个长连接、双向持续通信的协议,没有“请求完成即返回”的语义——因此 BackgroundTasks 在 @websocket 路由中根本不会被触发,这也是你遇到 call_external_server 未执行的根本原因。

✅ 正确做法是:直接使用 asyncio.create_task() 启动一个独立的协程任务。该任务与 WebSocket 连接生命周期解耦,可并行运行、无需等待连接结束。

以下是优化后的完整实现示例:

皮卡智能
皮卡智能

AI驱动高效视觉设计平台

下载
import asyncio
import aiohttp
from fastapi import FastAPI, WebSocket, BackgroundTasks
from redis import asyncio as aioredis

app = FastAPI()

# ✅ 推荐:在应用启动时创建并复用 HTTP Client Session
# 避免每次调用都新建 session(开销大、易耗尽连接池)
@app.on_event("startup")
async def startup_event():
    app.state.session = aiohttp.ClientSession()

@app.on_event("shutdown")
async def shutdown_event():
    await app.state.session.close()

async def call_external_server(channel: str, text: str):
    print(f"[Background] Triggering external server for channel: {channel}")
    try:
        # ✅ 复用全局 session,而非每次新建
        async with app.state.session.get(
            f"http://localhost:9000/pub?channel={channel}&text={text}"
        ) as resp:
            print(f"[Background] External response status: {resp.status}")
            await resp.text()  # 确保读取完成
    except Exception as e:
        print(f"[Background] Failed to call external server: {e}")
    print(f"[Background] Done for channel: {channel}")

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    # 提取参数(示例:从 query 或 headers 获取)
    # 注意:WebSocket upgrade request 的 query params 可通过 websocket.url.query
    from urllib.parse import parse_qs
    query_params = parse_qs(websocket.url.query)
    channel = query_params.get("channel", ["default"])[0]
    text = query_params.get("text", [""])[0]

    # 初始化 Redis Pub/Sub
    redis = await aioredis.from_url("redis://localhost")
    pubsub = redis.pubsub()
    await pubsub.subscribe(channel)

    # ✅ 关键修复:使用 asyncio.create_task 启动真正异步后台任务
    # 此任务立即调度,不阻塞 WebSocket 主循环
    asyncio.create_task(call_external_server(channel, text))

    try:
        while True:
            # 非阻塞轮询,避免长时间挂起
            message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
            if message is not None:
                data = message.get("data")
                if isinstance(data, bytes):
                    decoded = data.decode()
                    if decoded == "STOP":  # 建议使用统一终止信号
                        print("(Reader) Received STOP signal")
                        break
                    await websocket.send_text(decoded)
    except Exception as e:
        print(f"[WebSocket] Error: {e}")
    finally:
        await pubsub.unsubscribe(channel)
        await redis.close()
        await websocket.close()

? 关键注意事项:

  • 不要在 WebSocket 中使用 BackgroundTasks:它底层强依赖 Request 对象和 Starlette 的 Response 生命周期,而 WebSocket 连接不产生标准 HTTP 响应。
  • 优先复用 aiohttp.ClientSession:在 startup 事件中创建单例 session,避免高频创建/销毁连接导致性能下降或 ConnectionResetError。
  • 合理处理 get_message(timeout=...):设置超时防止无限阻塞;结合 await asyncio.sleep(0) 可让出控制权,保证后台任务有机会执行。
  • 资源清理务必放在 finally 块中:确保无论是否异常,Redis 订阅和 WebSocket 连接都能被正确释放。
  • 终止逻辑建议解耦:如需通知外部服务停止监听,可通过另一路 Redis channel 或 HTTP 回调实现,而非依赖单一 STOP 字符串。

通过以上改造,你的后台调用将真正异步、可靠、高性能地运行于 WebSocket 上下文中,完美支撑“触发即返回、结果异步推送”的典型实时通信模式。

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
Python FastAPI异步API开发_Python怎么用FastAPI构建异步API
Python FastAPI异步API开发_Python怎么用FastAPI构建异步API

Python FastAPI 异步开发利用 async/await 关键字,通过定义异步视图函数、使用异步数据库库 (如 databases)、异步 HTTP 客户端 (如 httpx),并结合后台任务队列(如 Celery)和异步依赖项,实现高效的 I/O 密集型 API,显著提升吞吐量和响应速度,尤其适用于处理数据库查询、网络请求等耗时操作,无需阻塞主线程。

29

2025.12.22

Python 微服务架构与 FastAPI 框架
Python 微服务架构与 FastAPI 框架

本专题系统讲解 Python 微服务架构设计与 FastAPI 框架应用,涵盖 FastAPI 的快速开发、路由与依赖注入、数据模型验证、API 文档自动生成、OAuth2 与 JWT 身份验证、异步支持、部署与扩展等。通过实际案例,帮助学习者掌握 使用 FastAPI 构建高效、可扩展的微服务应用,提高服务响应速度与系统可维护性。

253

2026.02.06

session失效的原因
session失效的原因

session失效的原因有会话超时、会话数量限制、会话完整性检查、服务器重启、浏览器或设备问题等等。详细介绍:1、会话超时:服务器为Session设置了一个默认的超时时间,当用户在一段时间内没有与服务器交互时,Session将自动失效;2、会话数量限制:服务器为每个用户的Session数量设置了一个限制,当用户创建的Session数量超过这个限制时,最新的会覆盖最早的等等。

337

2023.10.17

session失效解决方法
session失效解决方法

session失效通常是由于 session 的生存时间过期或者服务器关闭导致的。其解决办法:1、延长session的生存时间;2、使用持久化存储;3、使用cookie;4、异步更新session;5、使用会话管理中间件。

776

2023.10.18

cookie与session的区别
cookie与session的区别

本专题整合了cookie与session的区别和使用方法等相关内容,阅读专题下面的文章了解更详细的内容。

97

2025.08.19

js 字符串转数组
js 字符串转数组

js字符串转数组的方法:1、使用“split()”方法;2、使用“Array.from()”方法;3、使用for循环遍历;4、使用“Array.split()”方法。本专题为大家提供js字符串转数组的相关的文章、下载、课程内容,供大家免费下载体验。

761

2023.08.03

js截取字符串的方法
js截取字符串的方法

js截取字符串的方法有substring()方法、substr()方法、slice()方法、split()方法和slice()方法。本专题为大家提供字符串相关的文章、下载、课程内容,供大家免费下载体验。

221

2023.09.04

java基础知识汇总
java基础知识汇总

java基础知识有Java的历史和特点、Java的开发环境、Java的基本数据类型、变量和常量、运算符和表达式、控制语句、数组和字符串等等知识点。想要知道更多关于java基础知识的朋友,请阅读本专题下面的的有关文章,欢迎大家来php中文网学习。

1570

2023.10.24

c++ 字符处理
c++ 字符处理

本专题整合了c++字符处理教程、字符串处理函数相关内容,阅读专题下面的文章了解更多详细内容。

0

2026.03.17

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号