0

0

运行异步TCP服务器与FastAPI:统一事件循环下的应用集成

心靈之曲

心靈之曲

发布时间:2025-10-22 10:03:30

|

805人浏览过

|

来源于php中文网

原创

运行异步TCP服务器与FastAPI:统一事件循环下的应用集成

本文详细阐述了如何在fastapi应用中,利用其`lifespan`事件管理器,高效且优雅地集成多个异步tcp服务器。通过正确使用`asyncio.create_task`在应用启动时启动后台服务,并在应用关闭时实现这些服务的平滑终止,确保fastapi与自定义tcp服务在同一个事件循环中协同工作,实现数据从tcp到websocket的无缝转发。

1. 引言:FastAPI与异步服务的融合

在构建现代异步应用时,我们常常需要将Web服务(如基于FastAPI)与自定义的后台服务(如TCP服务器)结合起来。FastAPI以其高性能和异步特性而闻名,而Python的asyncio库则为构建并发网络应用提供了强大的支持。本文将探讨如何在同一个Python进程和事件循环中,无缝地运行一个FastAPI应用和多个异步TCP服务器,并实现数据在它们之间的流转,例如将TCP接收到的数据通过WebSocket广播给客户端。

2. 理解FastAPI的Lifespan事件管理器

FastAPI提供了lifespan事件管理器,这是一个基于contextlib.asynccontextmanager的强大工具,用于在应用程序启动和关闭时执行异步操作。其核心在于yield关键字:

  • yield之前的部分:在应用程序启动时执行。这里适合进行资源初始化、数据库连接、启动后台任务等操作。
  • yield之后的部分:在应用程序关闭时执行。这里适合进行资源清理、关闭连接、停止后台任务等操作。

常见误区:将需要持续运行的后台任务的启动逻辑放置在yield之后。这样做会导致任务仅在应用程序关闭时才尝试启动,而非在应用程序运行期间。

3. 正确集成异步TCP服务器的策略

为了让TCP服务器与FastAPI应用同时运行,并共享同一个事件循环,我们需要遵循以下策略:

  1. 使用 asyncio.create_task() 启动后台任务:将TCP服务器的启动协程包装成一个asyncio.Task。这会立即调度协程在事件循环中运行,而不会阻塞lifespan的启动流程。
  2. 在 yield 之前启动任务:确保所有需要随应用生命周期运行的后台任务都在lifespan的yield语句之前被创建并启动。
  3. 在 yield 之后实现优雅关闭:当应用收到关闭信号时(例如Ctrl+C或进程终止),lifespan的yield之后的部分会被执行。此时,我们应该取消之前启动的后台任务,并等待它们完成清理工作,以确保资源被正确释放。

4. 完整的代码示例

以下是根据上述策略修改后的代码,包括server.py, globals.py, websocket_manager.py 和 main.py。

websocket_manager.py (WebSocket连接管理)

# websocket_manager.py
from fastapi import WebSocket
from typing import List

class WebSocketManager:
    """
    管理活跃的WebSocket连接,并提供广播功能。
    """
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        """建立WebSocket连接并将其添加到活跃连接列表。"""
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        """从活跃连接列表中移除断开的WebSocket连接。"""
        if websocket in self.active_connections:
            self.active_connections.remove(websocket)

    async def broadcast(self, data: str):
        """向所有活跃的WebSocket连接广播数据。"""
        # 遍历时创建一个副本以避免在迭代过程中修改列表
        for connection in list(self.active_connections):
            try:
                await connection.send_text(data)
            except Exception as e:
                print(f"Error broadcasting to WebSocket: {e}. Disconnecting...")
                self.disconnect(connection) # 广播失败则断开连接

globals.py (全局变量)

# globals.py
import threading
from websocket_manager import WebSocketManager

# 示例:全局数据存储和锁(当前示例中未使用,但保留结构)
data_storage = {}
data_lock = threading.Lock() # 注意:在asyncio环境中,通常应使用asyncio.Lock

# WebSocket管理器实例,供其他模块访问
websocket_manager = WebSocketManager()

server.py (异步TCP服务器)

# server.py
import asyncio
import globals

async def handle_client(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    """
    处理单个TCP客户端连接。
    从客户端读取数据,并通过WebSocketManager广播。
    """
    peername = writer.get_extra_info('peername')
    print(f"TCP client connected from {peername}")
    try:
        while True:
            data = await reader.read(1024) # 读取最多1024字节
            if not data:
                print(f"TCP client {peername} disconnected.")
                break
            # 将接收到的原始数据解码为UTF-8字符串并广播
            message = data.decode('utf-8', errors='ignore')
            print(f"Received from TCP {peername}: {message}")
            await globals.websocket_manager.broadcast(message)
    except asyncio.CancelledError:
        print(f"TCP client handler for {peername} cancelled.")
    except Exception as e:
        print(f"Error handling TCP client {peername}: {e}")
    finally:
        writer.close()
        await writer.wait_closed()
        print(f"TCP client writer for {peername} closed.")

async def run_tcp_server_task(port: int):
    """
    启动一个TCP服务器,并在事件循环中运行。
    此函数设计为可取消的后台任务。
    """
    server = None
    try:
        print(f"Starting TCP server on 0.0.0.0:{port}...")
        server = await asyncio.start_server(handle_client, '0.0.0.0', port)
        async with server:
            await server.serve_forever() # 阻塞直到任务被取消
    except asyncio.CancelledError:
        print(f"TCP server on port {port} task cancelled.")
    except Exception as e:
        print(f"Error in TCP server on port {port}: {e}")
    finally:
        if server:
            server.close() # 关闭服务器套接字
            await server.wait_closed() # 等待服务器完全关闭
            print(f"TCP server on port {port} closed.")

main.py (FastAPI应用入口)

# main.py
from fastapi import FastAPI, WebSocket
import asyncio
from contextlib import asynccontextmanager
import globals # 导入全局变量
from server import run_tcp_server_task # 导入TCP服务器启动函数

@asynccontextmanager
async def startup_event(app: FastAPI):
    """
    FastAPI应用的生命周期事件管理器。
    在应用启动时启动TCP服务器,在应用关闭时停止它们。
    """
    print("Application startup: Initializing and starting background tasks...")

    # 定义需要启动的TCP服务器端口
    ports = [8001, 8002, 8003]
    # 为每个TCP服务器创建一个后台任务
    # 这些任务会在当前事件循环中并发运行
    tcp_server_tasks = [asyncio.create_task(run_tcp_server_task(port)) for port in ports]

    # `yield` 标志着应用启动完成,可以开始处理请求
    yield

    # `yield` 之后的部分在应用关闭时执行
    print("Application shutdown: Stopping background tasks...")
    # 取消所有TCP服务器任务
    for task in tcp_server_tasks:
        task.cancel()
    # 等待所有任务完成取消和清理工作
    # `return_exceptions=True` 确保即使有任务取消失败也不会阻塞其他任务
    await asyncio.gather(*tcp_server_tasks, return_exceptions=True)
    print("All background tasks stopped gracefully.")

# 使用lifespan事件管理器创建FastAPI应用
app = FastAPI(lifespan=startup_event)

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    """
    FastAPI的WebSocket端点。
    管理WebSocket连接,并在连接断开时进行清理。
    """
    print("WebSocket connection established.")
    await globals.websocket_manager.connect(websocket)
    try:
        # 保持WebSocket连接活跃,或处理来自客户端的WebSocket消息
        # 在此示例中,我们只接收消息以保持连接开放,实际应用中可能需要处理消息
        while True:
            await websocket.receive_text()
    except Exception as e:
        print(f"WebSocket Error: {e}")
    finally:
        globals.websocket_manager.disconnect(websocket)
        print("WebSocket connection closed.")

# 运行应用:uvicorn main:app --reload

5. 代码工作原理与注意事项

  1. lifespan的正确使用

    Teleporthq
    Teleporthq

    一体化AI网站生成器,能够快速设计和部署静态网站

    下载
    • 在main.py的startup_event中,asyncio.create_task(run_tcp_server_task(port))在yield之前被调用。这确保了TCP服务器任务在FastAPI应用启动时立即开始运行,并作为后台任务与FastAPI的HTTP/WebSocket服务共享同一个事件循环。
    • 当应用收到关闭信号时,yield之后的代码块被执行。此时,我们通过task.cancel()向每个TCP服务器任务发送取消信号,并使用asyncio.gather(*tcp_server_tasks, return_exceptions=True)等待所有任务优雅地完成其清理工作。
  2. TCP服务器的优雅关闭

    • run_tcp_server_task函数内部使用了await server.serve_forever(),这是一个阻塞调用。当其所在的任务被取消时,它会抛出asyncio.CancelledError。
    • try...except asyncio.CancelledError...finally块确保了即使任务被取消,server.close()和await server.wait_closed()也能被执行,从而正确关闭TCP服务器的套接字。
  3. WebSocketManager

    • 负责管理所有活跃的WebSocket连接。当TCP服务器接收到数据时,它会通过globals.websocket_manager.broadcast()将数据发送给所有连接的WebSocket客户端。
    • 在广播过程中,加入了错误处理,如果向某个WebSocket发送数据失败,会将其从活跃连接中移除,提高健壮性。
  4. 全局变量 (globals.py)

    • 用于在不同模块间共享WebSocketManager实例。
    • 注意,如果需要共享其他可变状态并在多个异步任务中访问,应优先使用asyncio.Lock而非threading.Lock,以避免阻塞事件循环。在本例中,data_storage和data_lock未被实际使用。
  5. 运行应用

    • 使用uvicorn main:app --reload命令即可启动FastAPI应用。Uvicorn会自动管理事件循环,并执行lifespan事件。

6. 总结

通过本文的指导,我们学习了如何利用FastAPI的lifespan事件管理器,在同一个事件循环中有效地运行FastAPI应用和多个异步TCP服务器。关键在于理解yield的语义,并使用asyncio.create_task来调度后台任务,同时实现任务的优雅启动和关闭。这种模式对于构建需要集成多种网络服务类型的复杂异步应用至关重要,它确保了资源的有效利用和应用的健壮性。

相关专题

更多
python开发工具
python开发工具

php中文网为大家提供各种python开发工具,好的开发工具,可帮助开发者攻克编程学习中的基础障碍,理解每一行源代码在程序执行时在计算机中的过程。php中文网还为大家带来python相关课程以及相关文章等内容,供大家免费下载使用。

758

2023.06.15

python打包成可执行文件
python打包成可执行文件

本专题为大家带来python打包成可执行文件相关的文章,大家可以免费的下载体验。

639

2023.07.20

python能做什么
python能做什么

python能做的有:可用于开发基于控制台的应用程序、多媒体部分开发、用于开发基于Web的应用程序、使用python处理数据、系统编程等等。本专题为大家提供python相关的各种文章、以及下载和课程。

761

2023.07.25

format在python中的用法
format在python中的用法

Python中的format是一种字符串格式化方法,用于将变量或值插入到字符串中的占位符位置。通过format方法,我们可以动态地构建字符串,使其包含不同值。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

618

2023.07.31

python教程
python教程

Python已成为一门网红语言,即使是在非编程开发者当中,也掀起了一股学习的热潮。本专题为大家带来python教程的相关文章,大家可以免费体验学习。

1265

2023.08.03

python环境变量的配置
python环境变量的配置

Python是一种流行的编程语言,被广泛用于软件开发、数据分析和科学计算等领域。在安装Python之后,我们需要配置环境变量,以便在任何位置都能够访问Python的可执行文件。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

548

2023.08.04

python eval
python eval

eval函数是Python中一个非常强大的函数,它可以将字符串作为Python代码进行执行,实现动态编程的效果。然而,由于其潜在的安全风险和性能问题,需要谨慎使用。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

579

2023.08.04

scratch和python区别
scratch和python区别

scratch和python的区别:1、scratch是一种专为初学者设计的图形化编程语言,python是一种文本编程语言;2、scratch使用的是基于积木的编程语法,python采用更加传统的文本编程语法等等。本专题为大家提供scratch和python相关的文章、下载、课程内容,供大家免费下载体验。

708

2023.08.11

高德地图升级方法汇总
高德地图升级方法汇总

本专题整合了高德地图升级相关教程,阅读专题下面的文章了解更多详细内容。

43

2026.01.16

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 3.2万人学习

Django 教程
Django 教程

共28课时 | 3.2万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号