Phaser是Java中支持多阶段同步与动态线程注册/注销的灵活协作工具,适用于复杂场景。它允许线程在每个阶段调用arriveAndAwaitAdvance()进行同步,通过register()或bulkRegister()动态加入,使用arriveAndDeregister()退出。重写onAdvance()可自定义阶段切换逻辑并决定是否终止流程。典型应用如分阶段数据处理:多个线程依次完成加载、处理、汇总任务,主线程协调各阶段推进。相比CountDownLatch和CyclicBarrier,Phaser支持重复使用、动态调整参与者数量及分层结构,适合并行计算、任务调度等需阶段性协同的系统。

在Java中,Phaser 是一种比 CountDownLatch 和 CyclicBarrier 更灵活的同步工具,适用于需要多阶段协作、动态参与线程数量的复杂场景。它允许线程在多个阶段(phase)中进行同步,并且支持线程动态注册和注销,非常适合实现复杂的线程协作。
Phaser的基本概念
Phaser 的核心思想是“阶段同步”。所有参与的线程必须到达当前阶段的屏障点,才能一起进入下一阶段。与 CyclicBarrier 不同的是,Phaser 支持:
- 动态添加或移除参与线程
- 多阶段同步(可重复使用)
- 每个线程可以决定是否继续参与后续阶段
- 支持分层(树形结构)以提高性能
基本用法:创建和注册线程
你可以通过构造函数指定初始参与者数量,也可以让线程在运行时注册自己。
Phaser phaser = new Phaser(3); // 初始有3个参与者for (int i = 0; i < 3; i++) { new Thread(() -> { System.out.println("Phase 1: " + Thread.currentThread().getName()); phaser.arriveAndAwaitAdvance(); // 等待其他线程完成第一阶段
System.out.println("Phase 2: " + Thread.currentThread().getName()); phaser.arriveAndAwaitAdvance(); // 第二阶段同步 System.out.println("Done: " + Thread.currentThread().getName()); phaser.arriveAndDeregister(); // 完成任务并注销 }).start();}
立即学习“Java免费学习笔记(深入)”;
上面的例子中,三个线程在两个阶段中同步执行。每次调用 arriveAndAwaitAdvance() 表示该线程到达当前阶段,并等待其他线程也到达。
动态注册与多阶段控制
有时你希望线程在运行过程中才加入协作,这时可以用 register() 或 bulkRegister(n) 动态增加参与者。
Phaser phaser = new Phaser();phaser.register(); // 主线程注册为一个参与者
new Thread(() -> { phaser.register(); // 子线程注册自己 System.out.println("Worker 1 - Phase 1"); phaser.arriveAndAwaitAdvance();
System.out.println("Worker 1 - Phase 2"); phaser.arriveAndAwaitAdvance(); phaser.arriveAndDeregister();}).start();
// 主线程推进两个阶段后结束 System.out.println("Main - Phase 1"); phaser.arriveAndAwaitAdvance();
System.out.println("Main - Phase 2"); phaser.arriveAndAwaitAdvance(); phaser.arriveAndDeregister();
这种模式适合工作线程不确定数量的场景,比如任务调度器动态派发任务。
自定义阶段行为:onAdvance() 方法
你可以继承 Phaser 并重写 onAdvance(int phase, int registeredParties) 方法,在每一阶段结束时执行自定义逻辑,甚至决定是否终止整个流程。
Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("第1阶段完成");
return false;
case 1:
System.out.println("第2阶段完成,准备结束");
return true; // 返回true表示终止,后续阶段不再进行
default:
return true;
}
}
};
phaser.register();
// ... 启动多个线程执行 arriveAndAwaitAdvance()
phaser.arriveAndAwaitAdvance(); // 阶段0
phaser.arriveAndAwaitAdvance(); // 阶段1,之后终止
onAdvance() 返回 true 会使得 Phaser 进入终止状态,之后的所有同步调用将立即返回。
实际应用场景示例:分阶段数据处理
假设有一组线程需要协同完成三阶段任务:加载数据 → 处理数据 → 汇总结果。
Phaser phaser = new Phaser(1); // 主线程作为协调者先注册Runnable workerTask = () -> { phaser.register(); // 注册为参与者
// 阶段1:加载数据 System.out.println(Thread.currentThread().getName() + " 加载数据"); phaser.arriveAndAwaitAdvance(); // 阶段2:处理数据 System.out.println(Thread.currentThread().getName() + " 处理数据"); phaser.arriveAndAwaitAdvance(); // 阶段3:汇总前准备 System.out.println(Thread.currentThread().getName() + " 准备就绪"); phaser.arriveAndAwaitAdvance(); phaser.arriveAndDeregister();};
for (int i = 0; i
// 主线程推动各阶段 phaser.arriveAndAwaitAdvance(); // 所有线程完成加载 phaser.arriveAndAwaitAdvance(); // 所有线程完成处理 phaser.arriveAndAwaitAdvance(); // 最终同步
System.out.println("所有阶段完成!"); phaser.arriveAndDeregister(); // 协调者注销
这种方式清晰划分了任务阶段,且每个线程可独立控制生命周期。
基本上就这些。Phaser 的灵活性让它在需要阶段性同步、动态协作的系统中非常有用,比如并行计算框架、游戏循环、批处理服务等。掌握它的关键在于理解“阶段”和“注册/注销”机制,合理设计 onAdvance 回调。不复杂但容易忽略细节。










