
FastAPI 的 BackgroundTasks 不适用于 WebSocket 场景,因其依赖 HTTP 请求生命周期;应改用 asyncio.create_task() 启动真正的异步后台协程,并注意复用 HTTP 客户端以提升性能。
fastapi 的 `backgroundtasks` 不适用于 websocket 场景,因其依赖 http 请求生命周期;应改用 `asyncio.create_task()` 启动真正的异步后台协程,并注意复用 http 客户端以提升性能。
在 FastAPI 中,BackgroundTasks 是专为 HTTP 请求响应周期 设计的机制:它在请求返回客户端后、但响应尚未完全关闭前执行任务。然而,WebSocket 是一个长连接、双向持续通信的协议,没有“请求完成即返回”的语义——因此 BackgroundTasks 在 @websocket 路由中根本不会被触发,这也是你遇到 call_external_server 未执行的根本原因。
✅ 正确做法是:直接使用 asyncio.create_task() 启动一个独立的协程任务。该任务与 WebSocket 连接生命周期解耦,可并行运行、无需等待连接结束。
以下是优化后的完整实现示例:
import asyncio
import aiohttp
from fastapi import FastAPI, WebSocket, BackgroundTasks
from redis import asyncio as aioredis
app = FastAPI()
# ✅ 推荐:在应用启动时创建并复用 HTTP Client Session
# 避免每次调用都新建 session(开销大、易耗尽连接池)
@app.on_event("startup")
async def startup_event():
app.state.session = aiohttp.ClientSession()
@app.on_event("shutdown")
async def shutdown_event():
await app.state.session.close()
async def call_external_server(channel: str, text: str):
print(f"[Background] Triggering external server for channel: {channel}")
try:
# ✅ 复用全局 session,而非每次新建
async with app.state.session.get(
f"http://localhost:9000/pub?channel={channel}&text={text}"
) as resp:
print(f"[Background] External response status: {resp.status}")
await resp.text() # 确保读取完成
except Exception as e:
print(f"[Background] Failed to call external server: {e}")
print(f"[Background] Done for channel: {channel}")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# 提取参数(示例:从 query 或 headers 获取)
# 注意:WebSocket upgrade request 的 query params 可通过 websocket.url.query
from urllib.parse import parse_qs
query_params = parse_qs(websocket.url.query)
channel = query_params.get("channel", ["default"])[0]
text = query_params.get("text", [""])[0]
# 初始化 Redis Pub/Sub
redis = await aioredis.from_url("redis://localhost")
pubsub = redis.pubsub()
await pubsub.subscribe(channel)
# ✅ 关键修复:使用 asyncio.create_task 启动真正异步后台任务
# 此任务立即调度,不阻塞 WebSocket 主循环
asyncio.create_task(call_external_server(channel, text))
try:
while True:
# 非阻塞轮询,避免长时间挂起
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
if message is not None:
data = message.get("data")
if isinstance(data, bytes):
decoded = data.decode()
if decoded == "STOP": # 建议使用统一终止信号
print("(Reader) Received STOP signal")
break
await websocket.send_text(decoded)
except Exception as e:
print(f"[WebSocket] Error: {e}")
finally:
await pubsub.unsubscribe(channel)
await redis.close()
await websocket.close()? 关键注意事项:
- 不要在 WebSocket 中使用 BackgroundTasks:它底层强依赖 Request 对象和 Starlette 的 Response 生命周期,而 WebSocket 连接不产生标准 HTTP 响应。
- 优先复用 aiohttp.ClientSession:在 startup 事件中创建单例 session,避免高频创建/销毁连接导致性能下降或 ConnectionResetError。
- 合理处理 get_message(timeout=...):设置超时防止无限阻塞;结合 await asyncio.sleep(0) 可让出控制权,保证后台任务有机会执行。
- 资源清理务必放在 finally 块中:确保无论是否异常,Redis 订阅和 WebSocket 连接都能被正确释放。
- 终止逻辑建议解耦:如需通知外部服务停止监听,可通过另一路 Redis channel 或 HTTP 回调实现,而非依赖单一 STOP 字符串。
通过以上改造,你的后台调用将真正异步、可靠、高性能地运行于 WebSocket 上下文中,完美支撑“触发即返回、结果异步推送”的典型实时通信模式。










