
本文详解如何通过服务端状态管理与主动推送机制,解决 flet 多用户访问时页面数据不同步问题,避免轮询导致的 ui 冻结,并确保所有已连接客户端实时显示最新计数。
在 Flet 中,每个 page 实例代表一个独立的客户端会话(如浏览器标签页),其 UI 状态默认不共享、不自动同步。你遇到的问题——用户 A 看到 Views: 1,用户 B 刷新后看到 Views: 2,但用户 A 的界面仍显示 1——本质是典型的客户端状态隔离 + 缺乏服务端广播机制所致。
原代码存在多个关键缺陷:
- ❌ page.update() 仅更新当前 page 实例,无法触达其他已连接客户端;
- ❌ 频繁 while True: page.update() 会阻塞主线程,导致事件响应失效(UI “卡死”);
- ❌ 文件 I/O 操作未加锁,多用户并发写入易引发数据竞争(如覆盖、错位);
- ❌ page.add(views) 在 route_change 外执行,逻辑位置错误,且未绑定动态更新逻辑。
✅ 正确解法不是“刷新当前页”,而是 “让所有活跃 page 共享并响应同一份权威状态”。Flet 提供了 page.pubsub(发布/订阅)机制,专为跨页面通信设计:
ShopNC多用户商城,全新的框架体系,呈现给您不同于以往的操作模式,更简约的界面,更流畅的搜索机制,更具人性化的管理后台操作,更适应现在网络的运营模式解决方案,为您的创业之路打下了坚实的基础,你们的需求就是我们的动力。我们在原有的C-C模式的基础上更增添了时下最流行的团购频道,进一步的为您提高用户的活跃度以及黏性提供帮助。ShopNC商城系统V2.4版本新增功能及修改功能如下:微商城频道A、商城
✅ 推荐方案:使用 pubsub 实现全局状态广播
import flet as ft
# 全局计数器(服务端单一事实源)
views_count = 0
def main(page: ft.Page):
global views_count
# 从文件初始化(生产环境建议用数据库或原子文件操作)
try:
with open("views", "r") as f:
views_count = int(f.read().strip())
except (FileNotFoundError, ValueError):
views_count = 0
# 创建文本控件(后续复用,避免重复创建)
count_text = ft.Text(f"Views: {views_count}")
# 定义接收更新的回调函数
def on_views_update(data):
nonlocal views_count
views_count = int(data)
count_text.value = f"Views: {views_count}"
page.update() # 仅更新当前 page
# 订阅全局频道(所有 page 都监听同一频道)
page.pubsub.subscribe("global_views", on_views_update)
# 页面首次加载时显示当前值
page.add(count_text)
# 路由变更时:更新全局状态并广播
def route_change(e: ft.RouteChangeEvent):
nonlocal views_count
views_count += 1
# 原子化持久化(推荐用 'w' 模式覆盖写入,更安全)
with open("views", "w") as f:
f.write(str(views_count))
# 向所有订阅者广播新值
page.pubsub.publish("global_views", str(views_count))
page.on_route_change = route_change
page.go("/") # 触发初始路由⚠️ 关键注意事项
- pubsub 是进程内通信:适用于单实例部署。若需多进程/多服务器扩展,须接入 Redis 或消息队列(如 redis-pubsub + 自定义广播层)。
- 文件操作务必原子化:避免 a+ 追加写(易错位),统一用 w 覆盖写,并配合 with 确保自动关闭。
- 禁止阻塞主线程:永远不要在 main() 或事件处理器中使用 while True 或长耗时同步操作;Flet 的事件循环是单线程的。
- 控件复用优于重建:count_text 实例在 page.add() 后持续存在,只需修改 .value 并调用 page.update(),性能更优。
- 初始化时机:page.pubsub.subscribe() 必须在 page.add() 之前调用,确保订阅就绪后再渲染 UI。
? 替代思路(轻量级轮询,仅作备选)
若因环境限制无法使用 pubsub,可采用低频客户端轮询(非推荐,但比无限循环安全):
# 在 main() 中添加(不推荐,仅演示概念)
def poll_updates():
import time
while page.is_alive: # 安全退出条件
try:
with open("views", "r") as f:
new_count = int(f.read().strip())
if new_count != views_count:
views_count = new_count
count_text.value = f"Views: {views_count}"
page.update()
except Exception:
pass
time.sleep(2) # 每2秒检查一次,避免过载
# 启动后台任务(Flet 1.21+ 支持 asyncio.create_task)
import asyncio
asyncio.create_task(poll_updates())? 总结:Flet 的实时同步核心在于 “状态上移 + 事件广播” —— 将数据源置于服务端(如全局变量 + 文件/DB),通过 pubsub 主动推送变更,而非让各客户端自行拉取。这既保障一致性,又维持响应性,是构建协作型 Flet 应用的标准实践。








