
本文介绍如何使用 python 的 `threading` 模块,在 fastapi websocket 服务中安全、可控地并发执行两个长期运行的阻塞函数(如持续写盘的 `foo()` 和轮询读盘的 `bar()`),避免阻塞事件循环,支持按需启停,并与 websocket 生命周期联动。
在构建实时流式服务(如语音转录、日志监控、传感器数据采集)时,常需并行执行两类阻塞操作:一类是持续生成数据并落盘(如 foo()),另一类是轮询磁盘文件、处理并实时推送结果(如 bar())。由于二者天然 I/O 密集、逻辑稳定且依赖真实文件系统,强行改造成 async 不仅增加复杂度(需重写文件 I/O、管理内存缓冲),还可能引入竞态或难以优雅终止的问题——正如提问者所指出的:asyncio 任务难中断、共享状态易失效、背景任务不可控。
此时,多线程(threading)是更简洁、可靠的选择:它天然隔离阻塞调用,无需修改函数签名;通过 threading.Event 可实现跨线程信号通信;结合 weakref 或上下文管理,能精准绑定到 WebSocket 连接生命周期。
✅ 推荐实践:带生命周期管理的线程封装
以下是一个生产就绪的实现方案,核心要点包括:
- 使用 threading.Thread 封装 foo 和 bar,确保二者真正并发;
- 通过 threading.Event 实现统一启停控制;
- 在 WebSocket 断开或收到 "STOP" 命令时,安全停止线程并清理资源;
- 避免全局状态污染,为每个客户端分配独立线程与存储路径。
import threading
import time
import json
import os
from pathlib import Path
from fastapi import WebSocket, WebSocketDisconnect
from typing import Dict, Optional
# 全局注册表:client_id → {foo_thread, bar_thread, stop_event, storage_dir}
ACTIVE_CLIENTS: Dict[int, Dict] = {}
def foo(url: str, client_id: int, stop_event: threading.Event, storage_dir: Path):
"""阻塞式数据生成器:持续写入磁盘"""
storage_dir.mkdir(exist_ok=True)
counter = 0
while not stop_event.is_set():
# 模拟耗时工作(如音频分片处理 + 写文件)
output_file = storage_dir / f"chunk_{counter}.txt"
with open(output_file, "w") as f:
f.write(f"Data from foo at {time.time():.2f}\n")
print(f"[foo-{client_id}] Wrote {output_file.name}")
counter += 1
time.sleep(1.5) # 模拟不规则写入节奏
def bar(client_id: int, stop_event: threading.Event, storage_dir: Path, websocket: WebSocket):
"""阻塞式数据消费者:轮询读取新文件并推送至 WebSocket"""
processed = set()
while not stop_event.is_set():
# 扫描目录中所有 .txt 文件
for p in storage_dir.glob("chunk_*.txt"):
if p.name not in processed:
try:
with open(p, "r") as f:
content = f.read().strip()
# 推送结果(注意:需在主线程中 await)
# → 此处通过回调机制交由主线程执行
asyncio.create_task(websocket.send_text(json.dumps({
"type": "transcript",
"data": content,
"timestamp": time.time()
})))
processed.add(p.name)
print(f"[bar-{client_id}] Sent {p.name}")
except Exception as e:
print(f"[bar-{client_id}] Error reading {p}: {e}")
time.sleep(0.8) # 轮询间隔,避免过度占用 CPU
# ⚠️ 关键:必须在主线程中导入 asyncio(FastAPI 环境已存在)
import asyncio
@app.websocket("/live-transcription")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
client_id = id(websocket)
storage_dir = Path(f"./storage/client_{client_id}")
stop_event = threading.Event()
foo_thread = None
bar_thread = None
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
command = message.get("command")
url = message.get("url", "")
if command == "STOP":
print(f"[Client {client_id}] Received STOP")
stop_event.set() # 通知线程退出
if foo_thread and foo_thread.is_alive():
foo_thread.join(timeout=2.0) # 最多等待 2 秒
if bar_thread and bar_thread.is_alive():
bar_thread.join(timeout=2.0)
break
elif command == "START":
print(f"[Client {client_id}] Starting foo & bar...")
# 启动线程(target 函数需接收所有必要参数)
foo_thread = threading.Thread(
target=foo,
args=(url, client_id, stop_event, storage_dir),
name=f"foo-{client_id}",
daemon=True # 防止主线程退出时残留
)
bar_thread = threading.Thread(
target=bar,
args=(client_id, stop_event, storage_dir, websocket),
name=f"bar-{client_id}",
daemon=True
)
foo_thread.start()
bar_thread.start()
# 注册到活跃客户端列表(便于调试/监控)
ACTIVE_CLIENTS[client_id] = {
"foo_thread": foo_thread,
"bar_thread": bar_thread,
"stop_event": stop_event,
"storage_dir": storage_dir
}
except WebSocketDisconnect:
print(f"[Client {client_id}] Disconnected abruptly")
except Exception as e:
print(f"[Client {client_id}] Error: {e}")
finally:
# 强制清理
stop_event.set()
if foo_thread and foo_thread.is_alive():
foo_thread.join(timeout=1.0)
if bar_thread and bar_thread.is_alive():
bar_thread.join(timeout=1.0)
# 清理临时目录(可选)
if storage_dir.exists():
for f in storage_dir.iterdir():
f.unlink(missing_ok=True)
storage_dir.rmdir()
ACTIVE_CLIENTS.pop(client_id, None)
await websocket.close()? 注意事项与最佳实践
- 线程安全的 WebSocket 发送:websocket.send_text() 是协程,不能在线程中直接 await。示例中采用 asyncio.create_task() 将发送任务提交到事件循环主线程,这是安全且推荐的方式。
- 优雅终止:threading.Event 是最轻量的线程间通信方式;配合 join(timeout=...) 可防止无限等待;daemon=True 确保主线程退出时子线程自动结束(适用于后台服务场景)。
- 资源隔离:为每个客户端创建独立 storage_dir,避免多连接间文件冲突;使用 id(websocket) 作为临时 client_id(生产环境建议替换为 JWT 解析出的用户 ID)。
- 性能权衡:线程切换开销远小于 asyncio 的协程调度复杂度,尤其适合 CPU/I/O 混合型阻塞任务;GIL 对纯 I/O 操作影响极小。
- 调试技巧:通过 threading.enumerate() 查看当前活跃线程;给线程命名(name= 参数)便于日志追踪。
✅ 总结
当面对“两个阻塞函数需长期并发运行 + 需与 WebSocket 生命周期强绑定 + 拒绝复杂异步改造”这一典型场景时,threading 不是退而求其次的方案,而是更直接、更可控、更符合 Unix 哲学(做一件事,并做好)的技术选择。它规避了 asyncio 的取消陷阱、共享状态难题和调试黑盒,让开发焦点回归业务逻辑本身。只要遵循事件通知、超时等待、资源隔离三原则,即可构建出健壮、可维护的实时流式服务。









