0

0

如何在 FastAPI WebSocket 中并发运行两个阻塞函数(线程方案)

霞舞

霞舞

发布时间:2026-02-12 22:56:56

|

422人浏览过

|

来源于php中文网

原创

如何在 FastAPI WebSocket 中并发运行两个阻塞函数(线程方案)

本文介绍如何使用 python 的 `threading` 模块,在 fastapi websocket 服务中安全、可控地并发执行两个长期运行的阻塞函数(如持续写盘的 `foo()` 和轮询读盘的 `bar()`),避免阻塞事件循环,支持按需启停,并与 websocket 生命周期联动。

在构建实时流式服务(如语音转录、日志监控、传感器数据采集)时,常需并行执行两类阻塞操作:一类是持续生成数据并落盘(如 foo()),另一类是轮询磁盘文件、处理并实时推送结果(如 bar())。由于二者天然 I/O 密集、逻辑稳定且依赖真实文件系统,强行改造成 async 不仅增加复杂度(需重写文件 I/O、管理内存缓冲),还可能引入竞态或难以优雅终止的问题——正如提问者所指出的:asyncio 任务难中断、共享状态易失效、背景任务不可控。

此时,多线程(threading)是更简洁、可靠的选择:它天然隔离阻塞调用,无需修改函数签名;通过 threading.Event 可实现跨线程信号通信;结合 weakref 或上下文管理,能精准绑定到 WebSocket 连接生命周期。

Veed AI Voice Generator
Veed AI Voice Generator

Veed推出的AI语音生成器

下载

✅ 推荐实践:带生命周期管理的线程封装

以下是一个生产就绪的实现方案,核心要点包括:

  • 使用 threading.Thread 封装 foo 和 bar,确保二者真正并发;
  • 通过 threading.Event 实现统一启停控制;
  • 在 WebSocket 断开或收到 "STOP" 命令时,安全停止线程并清理资源;
  • 避免全局状态污染,为每个客户端分配独立线程与存储路径。
import threading
import time
import json
import os
from pathlib import Path
from fastapi import WebSocket, WebSocketDisconnect
from typing import Dict, Optional

# 全局注册表:client_id → {foo_thread, bar_thread, stop_event, storage_dir}
ACTIVE_CLIENTS: Dict[int, Dict] = {}

def foo(url: str, client_id: int, stop_event: threading.Event, storage_dir: Path):
    """阻塞式数据生成器:持续写入磁盘"""
    storage_dir.mkdir(exist_ok=True)
    counter = 0
    while not stop_event.is_set():
        # 模拟耗时工作(如音频分片处理 + 写文件)
        output_file = storage_dir / f"chunk_{counter}.txt"
        with open(output_file, "w") as f:
            f.write(f"Data from foo at {time.time():.2f}\n")
        print(f"[foo-{client_id}] Wrote {output_file.name}")
        counter += 1
        time.sleep(1.5)  # 模拟不规则写入节奏

def bar(client_id: int, stop_event: threading.Event, storage_dir: Path, websocket: WebSocket):
    """阻塞式数据消费者:轮询读取新文件并推送至 WebSocket"""
    processed = set()
    while not stop_event.is_set():
        # 扫描目录中所有 .txt 文件
        for p in storage_dir.glob("chunk_*.txt"):
            if p.name not in processed:
                try:
                    with open(p, "r") as f:
                        content = f.read().strip()
                    # 推送结果(注意:需在主线程中 await)
                    # → 此处通过回调机制交由主线程执行
                    asyncio.create_task(websocket.send_text(json.dumps({
                        "type": "transcript",
                        "data": content,
                        "timestamp": time.time()
                    })))
                    processed.add(p.name)
                    print(f"[bar-{client_id}] Sent {p.name}")
                except Exception as e:
                    print(f"[bar-{client_id}] Error reading {p}: {e}")
        time.sleep(0.8)  # 轮询间隔,避免过度占用 CPU

# ⚠️ 关键:必须在主线程中导入 asyncio(FastAPI 环境已存在)
import asyncio

@app.websocket("/live-transcription")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    client_id = id(websocket)
    storage_dir = Path(f"./storage/client_{client_id}")

    stop_event = threading.Event()
    foo_thread = None
    bar_thread = None

    try:
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)
            command = message.get("command")
            url = message.get("url", "")

            if command == "STOP":
                print(f"[Client {client_id}] Received STOP")
                stop_event.set()  # 通知线程退出
                if foo_thread and foo_thread.is_alive():
                    foo_thread.join(timeout=2.0)  # 最多等待 2 秒
                if bar_thread and bar_thread.is_alive():
                    bar_thread.join(timeout=2.0)
                break

            elif command == "START":
                print(f"[Client {client_id}] Starting foo & bar...")
                # 启动线程(target 函数需接收所有必要参数)
                foo_thread = threading.Thread(
                    target=foo,
                    args=(url, client_id, stop_event, storage_dir),
                    name=f"foo-{client_id}",
                    daemon=True  # 防止主线程退出时残留
                )
                bar_thread = threading.Thread(
                    target=bar,
                    args=(client_id, stop_event, storage_dir, websocket),
                    name=f"bar-{client_id}",
                    daemon=True
                )
                foo_thread.start()
                bar_thread.start()

                # 注册到活跃客户端列表(便于调试/监控)
                ACTIVE_CLIENTS[client_id] = {
                    "foo_thread": foo_thread,
                    "bar_thread": bar_thread,
                    "stop_event": stop_event,
                    "storage_dir": storage_dir
                }

    except WebSocketDisconnect:
        print(f"[Client {client_id}] Disconnected abruptly")
    except Exception as e:
        print(f"[Client {client_id}] Error: {e}")
    finally:
        # 强制清理
        stop_event.set()
        if foo_thread and foo_thread.is_alive():
            foo_thread.join(timeout=1.0)
        if bar_thread and bar_thread.is_alive():
            bar_thread.join(timeout=1.0)
        # 清理临时目录(可选)
        if storage_dir.exists():
            for f in storage_dir.iterdir():
                f.unlink(missing_ok=True)
            storage_dir.rmdir()
        ACTIVE_CLIENTS.pop(client_id, None)
        await websocket.close()

? 注意事项与最佳实践

  • 线程安全的 WebSocket 发送:websocket.send_text() 是协程,不能在线程中直接 await。示例中采用 asyncio.create_task() 将发送任务提交到事件循环主线程,这是安全且推荐的方式。
  • 优雅终止:threading.Event 是最轻量的线程间通信方式;配合 join(timeout=...) 可防止无限等待;daemon=True 确保主线程退出时子线程自动结束(适用于后台服务场景)。
  • 资源隔离:为每个客户端创建独立 storage_dir,避免多连接间文件冲突;使用 id(websocket) 作为临时 client_id(生产环境建议替换为 JWT 解析出的用户 ID)。
  • 性能权衡:线程切换开销远小于 asyncio 的协程调度复杂度,尤其适合 CPU/I/O 混合型阻塞任务;GIL 对纯 I/O 操作影响极小。
  • 调试技巧:通过 threading.enumerate() 查看当前活跃线程;给线程命名(name= 参数)便于日志追踪。

✅ 总结

当面对“两个阻塞函数需长期并发运行 + 需与 WebSocket 生命周期强绑定 + 拒绝复杂异步改造”这一典型场景时,threading 不是退而求其次的方案,而是更直接、更可控、更符合 Unix 哲学(做一件事,并做好)的技术选择。它规避了 asyncio 的取消陷阱、共享状态难题和调试黑盒,让开发焦点回归业务逻辑本身。只要遵循事件通知、超时等待、资源隔离三原则,即可构建出健壮、可维护的实时流式服务。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
Python FastAPI异步API开发_Python怎么用FastAPI构建异步API
Python FastAPI异步API开发_Python怎么用FastAPI构建异步API

Python FastAPI 异步开发利用 async/await 关键字,通过定义异步视图函数、使用异步数据库库 (如 databases)、异步 HTTP 客户端 (如 httpx),并结合后台任务队列(如 Celery)和异步依赖项,实现高效的 I/O 密集型 API,显著提升吞吐量和响应速度,尤其适用于处理数据库查询、网络请求等耗时操作,无需阻塞主线程。

27

2025.12.22

Python 微服务架构与 FastAPI 框架
Python 微服务架构与 FastAPI 框架

本专题系统讲解 Python 微服务架构设计与 FastAPI 框架应用,涵盖 FastAPI 的快速开发、路由与依赖注入、数据模型验证、API 文档自动生成、OAuth2 与 JWT 身份验证、异步支持、部署与扩展等。通过实际案例,帮助学习者掌握 使用 FastAPI 构建高效、可扩展的微服务应用,提高服务响应速度与系统可维护性。

132

2026.02.06

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

653

2023.08.10

Python 多线程与异步编程实战
Python 多线程与异步编程实战

本专题系统讲解 Python 多线程与异步编程的核心概念与实战技巧,包括 threading 模块基础、线程同步机制、GIL 原理、asyncio 异步任务管理、协程与事件循环、任务调度与异常处理。通过实战示例,帮助学习者掌握 如何构建高性能、多任务并发的 Python 应用。

305

2025.12.24

java多线程相关教程合集
java多线程相关教程合集

本专题整合了java多线程相关教程,阅读专题下面的文章了解更多详细内容。

23

2026.01.21

C++多线程相关合集
C++多线程相关合集

本专题整合了C++多线程相关教程,阅读专题下面的的文章了解更多详细内容。

24

2026.01.21

C# 多线程与异步编程
C# 多线程与异步编程

本专题深入讲解 C# 中多线程与异步编程的核心概念与实战技巧,包括线程池管理、Task 类的使用、async/await 异步编程模式、并发控制与线程同步、死锁与竞态条件的解决方案。通过实际项目,帮助开发者掌握 如何在 C# 中构建高并发、低延迟的异步系统,提升应用性能和响应速度。

88

2026.02.06

Python 多线程与异步编程实战
Python 多线程与异步编程实战

本专题系统讲解 Python 多线程与异步编程的核心概念与实战技巧,包括 threading 模块基础、线程同步机制、GIL 原理、asyncio 异步任务管理、协程与事件循环、任务调度与异常处理。通过实战示例,帮助学习者掌握 如何构建高性能、多任务并发的 Python 应用。

305

2025.12.24

c语言 数据类型
c语言 数据类型

本专题整合了c语言数据类型相关内容,阅读专题下面的文章了解更多详细内容。

4

2026.02.12

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
swoole入门物联网开发与实战
swoole入门物联网开发与实战

共15课时 | 1.3万人学习

swoole项目实战(第二季)
swoole项目实战(第二季)

共15课时 | 1.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号