0

0

使用 asyncio.wait 实现 WebSocket 广播:解决阻塞问题

聖光之護

聖光之護

发布时间:2025-08-19 20:42:17

|

172人浏览过

|

来源于php中文网

原创

使用 asyncio.wait 实现 websocket 广播:解决阻塞问题

本文旨在解决在使用 WebSocket 实现视频帧预测结果广播时遇到的客户端无法接收数据或接收延迟的问题。通过分析问题代码,并对比 websockets.broadcast() 和 asyncio.wait() 的行为,提供了一种基于 asyncio.wait() 的解决方案,并解释了两种方法之间的差异,帮助开发者更有效地构建实时数据推送服务。

问题分析

原始代码中使用 websockets.broadcast(clients, result) 在 while True 循环中广播预测结果。然而,多个客户端连接时,发现只有第一个客户端能正常接收数据,后续客户端则无法接收,或者需要在服务端程序停止后才能接收到数据。这表明 websockets.broadcast() 可能存在阻塞问题,导致服务端无法及时处理新的客户端连接和数据发送。

解决方案:使用 asyncio.wait()

将 websockets.broadcast(clients, result) 替换为 await asyncio.wait([ws.send(result) for ws in clients]) 解决了该问题。

修改后的服务端代码:

import websockets
import cv2
import asyncio
import time

def predict(image):
    # 替换为你的预测模型
    return "test"

async def echo(websocket, path):
    global vidCap, i
    try:
        while True:
            ret, image = vidCap.read()
            if ret:
                start = time.time()
                result = predict(image)
                # 使用 asyncio.wait 进行广播
                await asyncio.wait([ws.send(result) for ws in clients])
                end = time.time()
                print("exec time:%f s" % (end - start))
            else:
                # 视频读取结束或发生错误,退出循环
                break
            await asyncio.sleep(0) # 释放事件循环控制权,避免CPU占用过高
    except websockets.exceptions.ConnectionClosedError:
        print("Client disconnected unexpectedly.")
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        clients.remove(websocket)


async def handler(websocket, path):
    clients.add(websocket)
    try:
        await echo(websocket, path)
    finally:
        if websocket in clients:
            clients.remove(websocket)

async def serve():
    start_server = await websockets.serve(handler, "localhost", 8765)
    await start_server.wait_closed()

if __name__ == '__main__':
    vidCap = cv2.VideoCapture('rtsp://xxx.xxx.xx') #rtsp or video
    clients = set()
    asyncio.run(serve())

客户端代码(保持不变):

import websockets
import asyncio
import time

async def get_result(uri):
    async with websockets.connect(uri) as websocket:
        while(True):
            try:
                start = time.time()
                recv_text = await websocket.recv()
                print(recv_text)
                end = time.time()
                print("exec:%f s" % (end - start))
            except websockets.exceptions.ConnectionClosedError:
                print("Server disconnected.")
                break # 退出循环
            except Exception as e:
                print(f"An error occurred: {e}")
                break

if __name__ == '__main__':
    asyncio.run(get_result("ws://127.0.0.1:8765/ws"))

关键修改说明:

  • await asyncio.wait([ws.send(result) for ws in clients]): 这行代码使用 asyncio.wait() 并发地向所有客户端发送消息。asyncio.wait() 接受一个 awaitable 对象的可迭代对象,并等待所有 awaitable 对象完成。 [ws.send(result) for ws in clients] 创建一个包含所有客户端发送消息任务的列表。

  • await asyncio.sleep(0): 在 while True 循环中添加 await asyncio.sleep(0) 可以将控制权交还给事件循环,避免 CPU 占用率过高。

    PaperFake
    PaperFake

    AI写论文

    下载
  • 异常处理: 在 echo 和 get_result 函数中添加了 try...except 块来处理 websockets.exceptions.ConnectionClosedError 和其他潜在的异常,使得程序更加健壮。

  • 客户端断开处理: 在 handler 函数的 finally 块中,确保在客户端断开连接时,将其从 clients 集合中移除。

websockets.broadcast() vs asyncio.wait()

  • websockets.broadcast(): websockets.broadcast() 是一个方便的函数,用于将消息广播到所有连接的客户端。然而,它可能以阻塞的方式工作,这意味着它会逐个发送消息,并且在完成所有发送之前不会释放控制权。在快速循环中,这会导致服务端无法及时处理新的连接或响应其他事件。

  • asyncio.wait(): asyncio.wait() 允许并发地执行多个 awaitable 对象。通过将每个客户端的 ws.send(result) 操作作为一个独立的 awaitable 对象,asyncio.wait() 能够并行地发送消息,从而避免阻塞主事件循环。

简而言之,asyncio.wait() 提供了更细粒度的控制,允许异步地执行发送操作,从而提高了服务端的并发性能和响应能力。

注意事项

  1. 视频源: 确保视频源 'rtsp://xxx.xxx.xx' 可用,并根据实际情况进行替换。
  2. 预测模型: predict(image) 函数需要替换为实际的预测模型。
  3. 性能优化: 对于高并发场景,需要进一步优化预测模型的性能,并考虑使用更高效的数据编码方式(例如,protobuf)来减少网络传输开销。
  4. 错误处理: 完善错误处理机制,例如,在客户端断开连接时,服务端应该能够正确地处理异常,并清理资源。

总结

通过使用 asyncio.wait(),可以有效地解决 WebSocket 广播中的阻塞问题,实现高并发、低延迟的实时数据推送服务。理解 websockets.broadcast() 和 asyncio.wait() 的差异,有助于选择合适的广播策略,并优化 WebSocket 应用的性能。同时,良好的错误处理和资源管理也是构建健壮的 WebSocket 应用的关键。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
while的用法
while的用法

while的用法是“while 条件: 代码块”,条件是一个表达式,当条件为真时,执行代码块,然后再次判断条件是否为真,如果为真则继续执行代码块,直到条件为假为止。本专题为大家提供while相关的文章、下载、课程内容,供大家免费下载体验。

107

2023.09.25

Golang WebSocket与实时通信开发
Golang WebSocket与实时通信开发

本专题系统讲解 Golang 在 WebSocket 开发中的应用,涵盖 WebSocket 协议、连接管理、消息推送、心跳机制、群聊功能与广播系统的实现。通过构建实际的聊天应用或实时数据推送系统,帮助开发者掌握 如何使用 Golang 构建高效、可靠的实时通信系统,提高并发处理与系统的可扩展性。

30

2025.12.22

PHP WebSocket 实时通信开发
PHP WebSocket 实时通信开发

本专题系统讲解 PHP 在实时通信与长连接场景中的应用实践,涵盖 WebSocket 协议原理、服务端连接管理、消息推送机制、心跳检测、断线重连以及与前端的实时交互实现。通过聊天系统、实时通知等案例,帮助开发者掌握 使用 PHP 构建实时通信与推送服务的完整开发流程,适用于即时消息与高互动性应用场景。

142

2026.01.19

PHP 高并发与性能优化
PHP 高并发与性能优化

本专题聚焦 PHP 在高并发场景下的性能优化与系统调优,内容涵盖 Nginx 与 PHP-FPM 优化、Opcode 缓存、Redis/Memcached 应用、异步任务队列、数据库优化、代码性能分析与瓶颈排查。通过实战案例(如高并发接口优化、缓存系统设计、秒杀活动实现),帮助学习者掌握 构建高性能PHP后端系统的核心能力。

114

2025.10.16

PHP 数据库操作与性能优化
PHP 数据库操作与性能优化

本专题聚焦于PHP在数据库开发中的核心应用,详细讲解PDO与MySQLi的使用方法、预处理语句、事务控制与安全防注入策略。同时深入分析SQL查询优化、索引设计、慢查询排查等性能提升手段。通过实战案例帮助开发者构建高效、安全、可扩展的PHP数据库应用系统。

99

2025.11.13

JavaScript 性能优化与前端调优
JavaScript 性能优化与前端调优

本专题系统讲解 JavaScript 性能优化的核心技术,涵盖页面加载优化、异步编程、内存管理、事件代理、代码分割、懒加载、浏览器缓存机制等。通过多个实际项目示例,帮助开发者掌握 如何通过前端调优提升网站性能,减少加载时间,提高用户体验与页面响应速度。

36

2025.12.30

JavaScript浏览器渲染机制与前端性能优化实践
JavaScript浏览器渲染机制与前端性能优化实践

本专题围绕 JavaScript 在浏览器中的执行与渲染机制展开,系统讲解 DOM 构建、CSSOM 解析、重排与重绘原理,以及关键渲染路径优化方法。内容涵盖事件循环机制、异步任务调度、资源加载优化、代码拆分与懒加载等性能优化策略。通过真实前端项目案例,帮助开发者理解浏览器底层工作原理,并掌握提升网页加载速度与交互体验的实用技巧。

103

2026.03.06

TypeScript类型系统进阶与大型前端项目实践
TypeScript类型系统进阶与大型前端项目实践

本专题围绕 TypeScript 在大型前端项目中的应用展开,深入讲解类型系统设计与工程化开发方法。内容包括泛型与高级类型、类型推断机制、声明文件编写、模块化结构设计以及代码规范管理。通过真实项目案例分析,帮助开发者构建类型安全、结构清晰、易维护的前端工程体系,提高团队协作效率与代码质量。

25

2026.03.13

Python异步编程与Asyncio高并发应用实践
Python异步编程与Asyncio高并发应用实践

本专题围绕 Python 异步编程模型展开,深入讲解 Asyncio 框架的核心原理与应用实践。内容包括事件循环机制、协程任务调度、异步 IO 处理以及并发任务管理策略。通过构建高并发网络请求与异步数据处理案例,帮助开发者掌握 Python 在高并发场景中的高效开发方法,并提升系统资源利用率与整体运行性能。

44

2026.03.12

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
10分钟--Midjourney创作自己的漫画
10分钟--Midjourney创作自己的漫画

共1课时 | 0.1万人学习

Midjourney 关键词系列整合
Midjourney 关键词系列整合

共13课时 | 0.9万人学习

AI绘画教程
AI绘画教程

共2课时 | 0.2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号