0

0

如何在 Reactor 中阻塞等待 Hot Flux 的下一个数据项

霞舞

霞舞

发布时间:2026-01-19 16:58:01

|

810人浏览过

|

来源于php中文网

原创

如何在 Reactor 中阻塞等待 Hot Flux 的下一个数据项

本文详解如何在不丢失实时性前提下,安全、精准地阻塞获取 hot flux 的“下一个”新发出的数据项,并覆盖无缓冲/有缓冲场景、线程安全限制及非阻塞替代方案。

在使用 Project Reactor 时,处理 Hot Flux(如 Flux.interval().share()、Sinks.multicast() 等)常面临一个关键挑战:你希望“暂停当前线程,直到下游真正发出下一个新值”,而非消费历史缓存或永远阻塞。blockFirst() 表面看似合适,但其行为取决于 Flux 的订阅时机与缓冲策略——对已开始发射的 Hot Flux,它可能立即返回旧值(若存在缓冲),或无限等待(若无缓冲且尚未发新值)。因此,正确做法需结合 Flux 的缓冲特性进行针对性设计。

✅ 场景一:无缓冲 Hot Flux(推荐直接使用 next().block() 或 blockFirst())

当 Flux 不保留历史(如 .share()、.multicast().directBestEffort()),所有订阅者仅接收订阅之后的新事件。此时 next().block() 与 blockFirst() 行为一致,均会阻塞至首个后续数据到达:

Flux hotFlux = Flux.interval(Duration.ofMillis(100))
    .map(i -> i.intValue())
    .share(); // 无缓冲热流

// 延迟 300ms 后,阻塞等待下一个整数(即第 3 或第 4 个,取决于调度精度)
Integer next = Mono.delay(Duration.ofMillis(300))
    .then(hotFlux.next()) // ← 关键:next() 返回 Mono,再 block()
    .block();
System.out.println("Received: " + next); // 如输出 3
⚠️ 注意:next() 比 blockFirst() 更灵活——它天然支持非阻塞链式调用(如 .cache().subscribe(...)),便于后续演进。

✅ 场景二:有缓冲 Hot Flux(必须跳过历史,只取“未来”值)

若 Flux 缓存了过往数据(如 .cache()、.replay(10)),直接 blockFirst() 会立刻返回最近缓存值,违背“等待下一个新值”的需求。此时应使用 skipUntilOther() 配合时间信号,将“跳过”逻辑锚定到订阅后的时间点

Flux bufferedHot = Flux.interval(Duration.ofMillis(100))
    .map(i -> i.intValue())
    .cache(); // 缓存全部历史

// 订阅后等待 500ms,再取第一个新值(跳过此前所有缓存+实时中已发出的项)
Integer futureValue = bufferedHot
    .skipUntilOther(Mono.delay(Duration.ofMillis(500)))
    .next()
    .block();
System.out.println("Next after 500ms: " + futureValue); // 如输出 5(第 6 个值)

? 原理:skipUntilOther 在 Mono.delay() 发出信号后才开始转发后续元素,确保跳过延迟期间所有已存在/已发出的数据。

⚠️ 重要限制:block() 并非万能,慎用线程上下文

Reactor 明确禁止在某些线程(如 parallel、boundedElastic 调度器线程)中调用 block(),否则抛出 IllegalStateException:

Magic Eraser
Magic Eraser

AI移除图片中不想要的物体

下载
// ❌ 危险!delay 默认在 parallel scheduler 上执行,内部 block 会失败
Mono.delay(Duration.ofMillis(200))
    .then(Mono.fromCallable(() -> hotFlux.blockFirst())) // → BLOCK FAILED!
    .subscribe();

✅ 正确做法:显式切换至支持阻塞的线程(如 Schedulers.boundedElastic()),或彻底避免阻塞(见下节)。

? 最佳实践:优先采用非阻塞方式(推荐)

阻塞操作违背响应式编程原则,易引发线程饥饿。更优雅的方案是预取并缓存目标值,供后续多次安全消费:

// 预先声明:500ms 后取下一个值,并缓存结果(含时间戳)
Mono> cachedNext = hotFlux
    .skipUntilOther(Mono.delay(Duration.ofMillis(500)))
    .next()
    .timed()
    .cache(); // ← 关键:只执行一次,结果可重用

// 后续任意位置安全获取(无阻塞、无重复计算)
cachedNext.subscribe(timed -> 
    System.out.println("Value: " + timed.get()));

总结

场景 推荐操作 关键要点
无缓冲 Hot Flux flux.next().block() 简洁可靠,依赖“订阅即起点”语义
有缓冲 Hot Flux flux.skipUntilOther(delay).next().block() 必须用时间信号锚定“未来”,跳过历史缓冲区
需要高并发/低延迟 cache() + subscribe() 彻底消除阻塞,提升系统吞吐与稳定性
调试/测试环境 可用 block(),但务必检查线程上下文 使用 Schedulers.boundedElastic() 包裹保障安全

牢记:Hot Flux 的“下一个”永远相对于你的订阅动作,而非全局时间轴。理解缓冲策略与订阅生命周期,是精准控制数据消费节奏的核心。

相关专题

更多
线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

481

2023.08.10

PS使用蒙版相关教程
PS使用蒙版相关教程

本专题整合了ps使用蒙版相关教程,阅读专题下面的文章了解更多详细内容。

23

2026.01.19

java用途介绍
java用途介绍

本专题整合了java用途功能相关介绍,阅读专题下面的文章了解更多详细内容。

15

2026.01.19

java输出数组相关教程
java输出数组相关教程

本专题整合了java输出数组相关教程,阅读专题下面的文章了解更多详细内容。

4

2026.01.19

java接口相关教程
java接口相关教程

本专题整合了java接口相关内容,阅读专题下面的文章了解更多详细内容。

2

2026.01.19

xml格式相关教程
xml格式相关教程

本专题整合了xml格式相关教程汇总,阅读专题下面的文章了解更多详细内容。

4

2026.01.19

PHP WebSocket 实时通信开发
PHP WebSocket 实时通信开发

本专题系统讲解 PHP 在实时通信与长连接场景中的应用实践,涵盖 WebSocket 协议原理、服务端连接管理、消息推送机制、心跳检测、断线重连以及与前端的实时交互实现。通过聊天系统、实时通知等案例,帮助开发者掌握 使用 PHP 构建实时通信与推送服务的完整开发流程,适用于即时消息与高互动性应用场景。

13

2026.01.19

微信聊天记录删除恢复导出教程汇总
微信聊天记录删除恢复导出教程汇总

本专题整合了微信聊天记录相关教程大全,阅读专题下面的文章了解更多详细内容。

93

2026.01.18

高德地图升级方法汇总
高德地图升级方法汇总

本专题整合了高德地图升级相关教程,阅读专题下面的文章了解更多详细内容。

112

2026.01.16

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
React 教程
React 教程

共58课时 | 3.8万人学习

国外Web开发全栈课程全集
国外Web开发全栈课程全集

共12课时 | 1.0万人学习

React核心原理新老生命周期精讲
React核心原理新老生命周期精讲

共12课时 | 1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号