0

0

使用 asyncio.wait 实现 WebSocket 广播:解决阻塞问题

聖光之護

聖光之護

发布时间:2025-08-19 20:42:17

|

172人浏览过

|

来源于php中文网

原创

使用 asyncio.wait 实现 websocket 广播:解决阻塞问题

本文旨在解决在使用 WebSocket 实现视频帧预测结果广播时遇到的客户端无法接收数据或接收延迟的问题。通过分析问题代码,并对比 websockets.broadcast() 和 asyncio.wait() 的行为,提供了一种基于 asyncio.wait() 的解决方案,并解释了两种方法之间的差异,帮助开发者更有效地构建实时数据推送服务。

问题分析

原始代码中使用 websockets.broadcast(clients, result) 在 while True 循环中广播预测结果。然而,多个客户端连接时,发现只有第一个客户端能正常接收数据,后续客户端则无法接收,或者需要在服务端程序停止后才能接收到数据。这表明 websockets.broadcast() 可能存在阻塞问题,导致服务端无法及时处理新的客户端连接和数据发送。

解决方案:使用 asyncio.wait()

将 websockets.broadcast(clients, result) 替换为 await asyncio.wait([ws.send(result) for ws in clients]) 解决了该问题。

修改后的服务端代码:

import websockets
import cv2
import asyncio
import time

def predict(image):
    # 替换为你的预测模型
    return "test"

async def echo(websocket, path):
    global vidCap, i
    try:
        while True:
            ret, image = vidCap.read()
            if ret:
                start = time.time()
                result = predict(image)
                # 使用 asyncio.wait 进行广播
                await asyncio.wait([ws.send(result) for ws in clients])
                end = time.time()
                print("exec time:%f s" % (end - start))
            else:
                # 视频读取结束或发生错误,退出循环
                break
            await asyncio.sleep(0) # 释放事件循环控制权,避免CPU占用过高
    except websockets.exceptions.ConnectionClosedError:
        print("Client disconnected unexpectedly.")
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        clients.remove(websocket)


async def handler(websocket, path):
    clients.add(websocket)
    try:
        await echo(websocket, path)
    finally:
        if websocket in clients:
            clients.remove(websocket)

async def serve():
    start_server = await websockets.serve(handler, "localhost", 8765)
    await start_server.wait_closed()

if __name__ == '__main__':
    vidCap = cv2.VideoCapture('rtsp://xxx.xxx.xx') #rtsp or video
    clients = set()
    asyncio.run(serve())

客户端代码(保持不变):

import websockets
import asyncio
import time

async def get_result(uri):
    async with websockets.connect(uri) as websocket:
        while(True):
            try:
                start = time.time()
                recv_text = await websocket.recv()
                print(recv_text)
                end = time.time()
                print("exec:%f s" % (end - start))
            except websockets.exceptions.ConnectionClosedError:
                print("Server disconnected.")
                break # 退出循环
            except Exception as e:
                print(f"An error occurred: {e}")
                break

if __name__ == '__main__':
    asyncio.run(get_result("ws://127.0.0.1:8765/ws"))

关键修改说明:

  • await asyncio.wait([ws.send(result) for ws in clients]): 这行代码使用 asyncio.wait() 并发地向所有客户端发送消息。asyncio.wait() 接受一个 awaitable 对象的可迭代对象,并等待所有 awaitable 对象完成。 [ws.send(result) for ws in clients] 创建一个包含所有客户端发送消息任务的列表。

  • await asyncio.sleep(0): 在 while True 循环中添加 await asyncio.sleep(0) 可以将控制权交还给事件循环,避免 CPU 占用率过高。

    Sologo AI
    Sologo AI

    SologoAI 是一款AI在线LOGO生成工具,帮助用户快速创建独特且专业的品牌标识和配套VI设计。

    下载
  • 异常处理: 在 echo 和 get_result 函数中添加了 try...except 块来处理 websockets.exceptions.ConnectionClosedError 和其他潜在的异常,使得程序更加健壮。

  • 客户端断开处理: 在 handler 函数的 finally 块中,确保在客户端断开连接时,将其从 clients 集合中移除。

websockets.broadcast() vs asyncio.wait()

  • websockets.broadcast(): websockets.broadcast() 是一个方便的函数,用于将消息广播到所有连接的客户端。然而,它可能以阻塞的方式工作,这意味着它会逐个发送消息,并且在完成所有发送之前不会释放控制权。在快速循环中,这会导致服务端无法及时处理新的连接或响应其他事件。

  • asyncio.wait(): asyncio.wait() 允许并发地执行多个 awaitable 对象。通过将每个客户端的 ws.send(result) 操作作为一个独立的 awaitable 对象,asyncio.wait() 能够并行地发送消息,从而避免阻塞主事件循环。

简而言之,asyncio.wait() 提供了更细粒度的控制,允许异步地执行发送操作,从而提高了服务端的并发性能和响应能力。

注意事项

  1. 视频源: 确保视频源 'rtsp://xxx.xxx.xx' 可用,并根据实际情况进行替换。
  2. 预测模型: predict(image) 函数需要替换为实际的预测模型。
  3. 性能优化: 对于高并发场景,需要进一步优化预测模型的性能,并考虑使用更高效的数据编码方式(例如,protobuf)来减少网络传输开销。
  4. 错误处理: 完善错误处理机制,例如,在客户端断开连接时,服务端应该能够正确地处理异常,并清理资源。

总结

通过使用 asyncio.wait(),可以有效地解决 WebSocket 广播中的阻塞问题,实现高并发、低延迟的实时数据推送服务。理解 websockets.broadcast() 和 asyncio.wait() 的差异,有助于选择合适的广播策略,并优化 WebSocket 应用的性能。同时,良好的错误处理和资源管理也是构建健壮的 WebSocket 应用的关键。

相关专题

更多
while的用法
while的用法

while的用法是“while 条件: 代码块”,条件是一个表达式,当条件为真时,执行代码块,然后再次判断条件是否为真,如果为真则继续执行代码块,直到条件为假为止。本专题为大家提供while相关的文章、下载、课程内容,供大家免费下载体验。

93

2023.09.25

Golang WebSocket与实时通信开发
Golang WebSocket与实时通信开发

本专题系统讲解 Golang 在 WebSocket 开发中的应用,涵盖 WebSocket 协议、连接管理、消息推送、心跳机制、群聊功能与广播系统的实现。通过构建实际的聊天应用或实时数据推送系统,帮助开发者掌握 如何使用 Golang 构建高效、可靠的实时通信系统,提高并发处理与系统的可扩展性。

20

2025.12.22

PHP WebSocket 实时通信开发
PHP WebSocket 实时通信开发

本专题系统讲解 PHP 在实时通信与长连接场景中的应用实践,涵盖 WebSocket 协议原理、服务端连接管理、消息推送机制、心跳检测、断线重连以及与前端的实时交互实现。通过聊天系统、实时通知等案例,帮助开发者掌握 使用 PHP 构建实时通信与推送服务的完整开发流程,适用于即时消息与高互动性应用场景。

53

2026.01.19

PHP 高并发与性能优化
PHP 高并发与性能优化

本专题聚焦 PHP 在高并发场景下的性能优化与系统调优,内容涵盖 Nginx 与 PHP-FPM 优化、Opcode 缓存、Redis/Memcached 应用、异步任务队列、数据库优化、代码性能分析与瓶颈排查。通过实战案例(如高并发接口优化、缓存系统设计、秒杀活动实现),帮助学习者掌握 构建高性能PHP后端系统的核心能力。

99

2025.10.16

PHP 数据库操作与性能优化
PHP 数据库操作与性能优化

本专题聚焦于PHP在数据库开发中的核心应用,详细讲解PDO与MySQLi的使用方法、预处理语句、事务控制与安全防注入策略。同时深入分析SQL查询优化、索引设计、慢查询排查等性能提升手段。通过实战案例帮助开发者构建高效、安全、可扩展的PHP数据库应用系统。

86

2025.11.13

JavaScript 性能优化与前端调优
JavaScript 性能优化与前端调优

本专题系统讲解 JavaScript 性能优化的核心技术,涵盖页面加载优化、异步编程、内存管理、事件代理、代码分割、懒加载、浏览器缓存机制等。通过多个实际项目示例,帮助开发者掌握 如何通过前端调优提升网站性能,减少加载时间,提高用户体验与页面响应速度。

26

2025.12.30

c++ 根号
c++ 根号

本专题整合了c++根号相关教程,阅读专题下面的文章了解更多详细内容。

25

2026.01.23

c++空格相关教程合集
c++空格相关教程合集

本专题整合了c++空格相关教程,阅读专题下面的文章了解更多详细内容。

31

2026.01.23

yy漫画官方登录入口地址合集
yy漫画官方登录入口地址合集

本专题整合了yy漫画入口相关合集,阅读专题下面的文章了解更多详细内容。

119

2026.01.23

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
10分钟--Midjourney创作自己的漫画
10分钟--Midjourney创作自己的漫画

共1课时 | 0.1万人学习

Midjourney 关键词系列整合
Midjourney 关键词系列整合

共13课时 | 0.9万人学习

AI绘画教程
AI绘画教程

共2课时 | 0.2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号