在 FastAPI 的 WebSocket 连接中,BackgroundTasks 不可用;必须改用 asyncio.create_task() 启动真正的异步后台协程,才能确保外部服务调用与 Redis 订阅逻辑并行执行。
在 fastapi 的 websocket 连接中,`backgroundtasks` 不可用;必须改用 `asyncio.create_task()` 启动真正的异步后台协程,才能确保外部服务调用与 redis 订阅逻辑并行执行。
WebSocket 是长连接、双向持续通信协议,而 FastAPI 的 BackgroundTasks 机制专为 HTTP 请求生命周期设计:它依赖 Request 对象,在响应返回客户端后立即触发,并在应用层自动管理任务生命周期(如异常捕获、作用域清理)。但 WebSocket 连接一旦建立,就不再有“请求完成”这一语义——连接长期保持,BackgroundTasks 因缺少上下文而无法注册或执行,这也是你调用 background_task.add_task(...) 后函数静默失效的根本原因。
要实现在 WebSocket 接入时立即触发外部服务调用(如通知下游系统向 Redis 发布消息),同时持续监听 Redis Pub/Sub 通道并将结果实时推送给客户端,应采用原生 asyncio 任务调度方式:
✅ 推荐方案:使用 asyncio.create_task()
该方法将协程直接提交至事件循环,不依赖任何请求上下文,完全适配 WebSocket 场景。注意:需手动处理异常(否则后台任务崩溃将无声失败),建议包裹 try/except 并记录日志。
以下是修正后的完整示例(含关键注释与健壮性增强):
import asyncio
import logging
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from redis import asyncio as aioredis
import aiohttp
app = FastAPI()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ✅ 全局复用 HTTP 客户端(避免每次新建 session,提升性能与连接复用)
HTTP_CLIENT = None
@app.on_event("startup")
async def startup_event():
global HTTP_CLIENT
HTTP_CLIENT = aiohttp.ClientSession()
@app.on_event("shutdown")
async def shutdown_event():
if HTTP_CLIENT:
await HTTP_CLIENT.close()
async def call_external_server(channel: str, text: str):
"""异步调用外部服务,触发其向 Redis 发布数据"""
try:
logger.info(f"Triggering external server for channel: {channel}, text: {text}")
async with HTTP_CLIENT.get(
f"http://localhost:9000/pub?channel={channel}&text={text}"
) as resp:
logger.debug(f"External server response: {resp.status}")
logger.info("External server notified successfully")
except Exception as e:
logger.error(f"Failed to notify external server: {e}")
STOPWORD = "STOP"
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# 从 headers 或 query 获取参数(示例中假设通过 header 传递)
headers = dict(websocket.headers)
channel = headers.get("x-channel", "default")
text = headers.get("x-text", "")
# 初始化 Redis Pub/Sub
redis_client = await aioredis.from_url("redis://localhost")
pubsub = redis_client.pubsub()
await pubsub.subscribe(channel)
logger.info(f"Subscribed to Redis channel: {channel}")
# ✅ 正确启动后台任务:使用 asyncio.create_task
# 注意:此处不 await,否则会阻塞 WebSocket 主循环
asyncio.create_task(call_external_server(channel, text))
try:
while True:
# 非阻塞获取消息,超时 1 秒避免永久挂起
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
if message is not None:
if isinstance(message["data"], bytes):
decoded_msg = message["data"].decode("utf-8")
if decoded_msg == STOPWORD:
logger.info("(Reader) Received STOP signal")
break
await websocket.send_text(decoded_msg)
except WebSocketDisconnect:
logger.info("Client disconnected")
except Exception as e:
logger.error(f"WebSocket error: {e}")
finally:
await pubsub.unsubscribe(channel)
await redis_client.close()
await websocket.close()? 关键注意事项:
- 绝不 await 后台任务:asyncio.create_task() 返回 Task 对象,直接调用即可并发执行;若 await 它,将退化为串行,失去后台意义。
- 异常必须显式捕获:create_task 启动的任务若抛出未捕获异常,会静默终止且不中断主流程,务必在任务内部 try/except 并记录日志。
- 资源复用至关重要:如示例所示,aiohttp.ClientSession 和 aioredis.Redis 应在应用生命周期内复用(startup/shutdown 事件管理),避免高频创建连接导致资源耗尽。
- Redis Pub/Sub 需主动清理:WebSocket 断开时务必 unsubscribe 并关闭 client,防止内存泄漏和订阅堆积。
? 进阶提示:若需更严格的后台任务生命周期管理(如取消、等待完成、错误传播),可考虑封装为 asyncio.TaskGroup(Python 3.11+)或结合 asyncio.shield() + asyncio.wait_for() 实现超时控制与取消联动。
掌握这一模式,你就能在 FastAPI WebSocket 场景中可靠地解耦“触发动作”与“监听响应”,构建真正响应式、高并发的实时通信服务。










