0

0

如何在FastAPI应用中优雅地集成并管理异步TCP服务器

聖光之護

聖光之護

发布时间:2025-10-23 13:38:19

|

552人浏览过

|

来源于php中文网

原创

如何在fastapi应用中优雅地集成并管理异步tcp服务器

本文详细探讨了在FastAPI应用中,通过`lifespan`事件管理异步TCP服务器的正确方法。核心内容包括识别`lifespan`中`yield`关键字的关键作用,阐明了在应用启动阶段启动TCP服务器任务的必要性,并提供了如何创建、运行及优雅关闭这些异步TCP服务器任务的完整示例代码和专业指导,确保FastAPI与TCP服务能协同工作。

在FastAPI应用中集成异步TCP服务器

在构建现代Web服务时,有时我们需要将HTTP/WebSocket服务(如FastAPI)与底层协议服务(如TCP服务器)结合起来。本文将深入探讨如何在同一个FastAPI应用中,利用其异步特性和生命周期管理机制,优雅地启动、运行并关闭多个异步TCP服务器。

理解FastAPI的lifespan事件

FastAPI提供了lifespan事件管理功能,允许我们在应用启动(startup)和关闭(shutdown)时执行特定的异步任务。这通过contextlib.asynccontextmanager装饰器实现。其核心是yield关键字,它将lifespan函数分为两个阶段:

  1. 启动阶段:yield之前的代码会在应用启动时执行。
  2. 关闭阶段:yield之后的代码会在应用关闭时执行。

常见错误与原因分析:

在初始尝试中,如果将启动TCP服务器的代码放在yield之后,这些TCP服务器将不会在FastAPI应用启动时运行,而只会在应用尝试关闭时才被触发,这显然不符合预期。这就是为什么在应用启动后,TCP服务器的socket连接会失败的原因。

正确启动异步TCP服务器

要确保TCP服务器在FastAPI应用启动时同步运行,我们需要将启动逻辑放在yield关键字之前。同时,由于TCP服务器是长时间运行的服务,我们不能直接await它,否则会阻塞FastAPI的启动。正确的做法是使用asyncio.create_task将其作为后台任务运行。

以下是实现这一目标的详细步骤和代码示例。

1. 准备全局状态管理

为了在TCP服务器和WebSocket服务之间共享数据和连接,我们通常需要一个全局状态管理器。

globals.py:

LogoAi
LogoAi

利用AI来设计你喜欢的Logo和品牌标志

下载
import threading
from websocket_manager import WebSocketManager

# 存储共享数据
data_storage = {}
# 用于数据访问的线程锁
data_lock = threading.Lock()
# WebSocket连接管理器
websocket_manager = WebSocketManager()

2. 实现WebSocket连接管理器

这个管理器负责处理WebSocket连接的建立、断开和数据广播。

websocket_manager.py:

from fastapi import WebSocket
from typing import List

class WebSocketManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        """建立WebSocket连接并添加到活动连接列表"""
        await websocket.accept()
        self.active_connections.append(websocket)
        print(f"WebSocket connected: {websocket.client}")

    def disconnect(self, websocket: WebSocket):
        """断开WebSocket连接并从活动连接列表移除"""
        if websocket in self.active_connections:
            self.active_connections.remove(websocket)
            print(f"WebSocket disconnected: {websocket.client}")

    async def broadcast(self, data: str):
        """向所有活动WebSocket连接广播数据"""
        for connection in self.active_connections:
            try:
                await connection.send_text(data)
            except Exception as e:
                print(f"Error broadcasting to WebSocket {connection.client}: {e}")
                # 如果发送失败,可以考虑断开该连接
                self.disconnect(connection)

3. 实现异步TCP服务器逻辑

TCP服务器需要处理客户端连接,接收数据,并通过WebSocket管理器广播出去。为了实现优雅关闭,我们将TCP服务器的创建和运行逻辑进行调整,以便lifespan可以管理其生命周期。

server.py:

import asyncio
import globals

async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    """处理单个TCP客户端连接"""
    addr = writer.get_extra_info('peername')
    print(f"TCP client connected from {addr}")
    try:
        while True:
            data = await reader.read(1024) # 读取数据
            if not data:
                break # 客户端断开连接
            decoded_data = data.decode('utf-8', errors='ignore')
            print(f"Received from TCP {addr}: {decoded_data}")
            # 通过WebSocket广播接收到的数据
            await globals.websocket_manager.broadcast(decoded_data)
    except asyncio.CancelledError:
        print(f"TCP client handler for {addr} cancelled.")
    except Exception as e:
        print(f"Error handling TCP client {addr}: {e}")
    finally:
        writer.close()
        await writer.wait_closed()
        print(f"TCP client {addr} disconnected.")

async def create_and_run_tcp_server(port: int):
    """
    创建并运行一个TCP服务器。
    此函数返回一个asyncio.Server实例,
    其serve_forever()方法将作为后台任务运行。
    """
    print(f"Attempting to start TCP server on port {port}...")
    server = await asyncio.start_server(handle_client, '0.0.0.0', port)
    print(f"TCP server started on port {port}")
    # serve_forever()是一个阻塞调用,需要通过create_task在后台运行
    # 并且在需要关闭时,调用server.close()来停止它
    await server.serve_forever()
    return server # 实际上,serve_forever会一直运行,直到被关闭,所以这里通常不会返回

4. 在FastAPI应用中集成TCP服务器

这是核心部分,我们将在main.py中定义FastAPI应用,并使用@asynccontextmanager来管理TCP服务器的生命周期。

main.py:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio
import globals
from server import create_and_run_tcp_server # 导入TCP服务器创建函数
from contextlib import asynccontextmanager

# 用于存储TCP服务器实例和其运行任务,以便在应用关闭时进行管理
tcp_servers = []
tcp_server_tasks = []

@asynccontextmanager
async def startup_event(app: FastAPI):
    """
    FastAPI应用的生命周期管理器。
    在yield之前启动所有后台服务,在yield之后处理服务关闭。
    """
    print("--- FastAPI Application Startup ---")
    ports = [8001, 8002, 8003] # 定义需要启动的TCP服务器端口

    # 启动TCP服务器
    print(f"Starting TCP servers on ports: {ports}")
    for port in ports:
        # 创建TCP服务器实例
        server_instance = await asyncio.start_server(globals.handle_client, '0.0.0.0', port)
        tcp_servers.append(server_instance)

        # 将服务器的serve_forever方法作为后台任务运行
        task = asyncio.create_task(server_instance.serve_forever())
        tcp_server_tasks.append(task)
        print(f"TCP server task created for port {port}")

    # 应用启动完成,现在可以处理请求
    yield

    # 应用关闭阶段:停止所有TCP服务器
    print("--- FastAPI Application Shutdown ---")
    print("Stopping TCP servers...")
    for server_instance in tcp_servers:
        server_instance.close() # 向TCP服务器发送关闭信号

    # 等待所有TCP服务器任务完成关闭
    # return_exceptions=True 确保即使某个任务关闭失败,其他任务也能继续等待
    await asyncio.gather(*tcp_server_tasks, return_exceptions=True)
    print("All TCP servers stopped gracefully.")
    print("--- FastAPI Application Shutdown Complete ---")


# 创建FastAPI应用实例,并指定lifespan管理器
app = FastAPI(lifespan=startup_event)

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    """
    FastAPI的WebSocket端点,用于客户端连接。
    """
    print("Attempting to connect to WebSocket...")
    await globals.websocket_manager.connect(websocket)
    print(f"WebSocket connected: {websocket.client}")
    try:
        while True:
            # 保持WebSocket连接活跃,并处理可能接收到的消息
            # 这里我们只是接收,不处理,因为数据流是从TCP到WebSocket
            message = await websocket.receive_text()
            print(f"Received from WebSocket {websocket.client}: {message}")
            # 如果需要,可以将WebSocket接收到的数据转发给TCP服务器
            # await some_tcp_client_writer.write(message.encode())
    except WebSocketDisconnect:
        print(f"WebSocket {websocket.client} disconnected.")
    except Exception as e:
        print(f"WebSocket Error for {websocket.client}: {e}")
    finally:
        globals.websocket_manager.disconnect(websocket)

运行应用

使用Uvicorn运行FastAPI应用:

uvicorn main:app --reload

当Uvicorn启动时,你将看到FastAPI和TCP服务器的启动日志。TCP服务器将监听在指定的端口(8001, 8002, 8003),并准备接收数据。当客户端连接到TCP服务器并发送数据时,数据将被转发到所有连接的WebSocket客户端。

注意事项与最佳实践

  1. 错误处理:在TCP客户端处理函数handle_client和WebSocket端点中,加入健壮的错误处理机制,以防止单个连接的故障影响整个服务。
  2. 资源清理:确保在lifespan的关闭阶段,所有启动的后台任务和资源都能被正确地关闭和释放。server.close()和await server.wait_closed()对于asyncio.Server是关键。
  3. 任务取消:对于更复杂的后台任务,除了使用_stop标志或close()方法外,还可以考虑使用task.cancel()来优雅地停止asyncio.Task。
  4. 日志记录:使用适当的日志记录来跟踪服务状态、连接事件和数据流,这对于调试和监控至关重要。
  5. 端口冲突:确保FastAPI应用和所有TCP服务器监听的端口不冲突。

总结

通过正确利用FastAPI的lifespan事件管理器和Python的asyncio库,我们可以无缝地将异步TCP服务器集成到FastAPI应用中。关键在于理解yield在lifespan中的作用,以及如何使用asyncio.create_task来启动后台任务,并实现优雅的关闭机制。这种集成方式为构建高性能、多协议的现代应用提供了强大的基础。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
Python FastAPI异步API开发_Python怎么用FastAPI构建异步API
Python FastAPI异步API开发_Python怎么用FastAPI构建异步API

Python FastAPI 异步开发利用 async/await 关键字,通过定义异步视图函数、使用异步数据库库 (如 databases)、异步 HTTP 客户端 (如 httpx),并结合后台任务队列(如 Celery)和异步依赖项,实现高效的 I/O 密集型 API,显著提升吞吐量和响应速度,尤其适用于处理数据库查询、网络请求等耗时操作,无需阻塞主线程。

28

2025.12.22

Python 微服务架构与 FastAPI 框架
Python 微服务架构与 FastAPI 框架

本专题系统讲解 Python 微服务架构设计与 FastAPI 框架应用,涵盖 FastAPI 的快速开发、路由与依赖注入、数据模型验证、API 文档自动生成、OAuth2 与 JWT 身份验证、异步支持、部署与扩展等。通过实际案例,帮助学习者掌握 使用 FastAPI 构建高效、可扩展的微服务应用,提高服务响应速度与系统可维护性。

251

2026.02.06

http500解决方法
http500解决方法

http500解决方法有检查服务器日志、检查代码错误、检查服务器配置、检查文件和目录权限、检查资源不足、更新软件版本、重启服务器或寻求专业帮助等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

494

2023.11.09

http请求415错误怎么解决
http请求415错误怎么解决

解决方法:1、检查请求头中的Content-Type;2、检查请求体中的数据格式;3、使用适当的编码格式;4、使用适当的请求方法;5、检查服务器端的支持情况。更多http请求415错误怎么解决的相关内容,可以阅读下面的文章。

449

2023.11.14

HTTP 503错误解决方法
HTTP 503错误解决方法

HTTP 503错误表示服务器暂时无法处理请求。想了解更多http错误代码的相关内容,可以阅读本专题下面的文章。

3407

2024.03.12

http与https有哪些区别
http与https有哪些区别

http与https的区别:1、协议安全性;2、连接方式;3、证书管理;4、连接状态;5、端口号;6、资源消耗;7、兼容性。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

2863

2024.08.16

Golang WebSocket与实时通信开发
Golang WebSocket与实时通信开发

本专题系统讲解 Golang 在 WebSocket 开发中的应用,涵盖 WebSocket 协议、连接管理、消息推送、心跳机制、群聊功能与广播系统的实现。通过构建实际的聊天应用或实时数据推送系统,帮助开发者掌握 如何使用 Golang 构建高效、可靠的实时通信系统,提高并发处理与系统的可扩展性。

27

2025.12.22

PHP WebSocket 实时通信开发
PHP WebSocket 实时通信开发

本专题系统讲解 PHP 在实时通信与长连接场景中的应用实践,涵盖 WebSocket 协议原理、服务端连接管理、消息推送机制、心跳检测、断线重连以及与前端的实时交互实现。通过聊天系统、实时通知等案例,帮助开发者掌握 使用 PHP 构建实时通信与推送服务的完整开发流程,适用于即时消息与高互动性应用场景。

140

2026.01.19

JavaScript浏览器渲染机制与前端性能优化实践
JavaScript浏览器渲染机制与前端性能优化实践

本专题围绕 JavaScript 在浏览器中的执行与渲染机制展开,系统讲解 DOM 构建、CSSOM 解析、重排与重绘原理,以及关键渲染路径优化方法。内容涵盖事件循环机制、异步任务调度、资源加载优化、代码拆分与懒加载等性能优化策略。通过真实前端项目案例,帮助开发者理解浏览器底层工作原理,并掌握提升网页加载速度与交互体验的实用技巧。

44

2026.03.06

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.5万人学习

Django 教程
Django 教程

共28课时 | 4.8万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.8万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号