生产者消费者模型的核心约束是:多个线程共享有限容量缓冲区,生产者只put、消费者只take,满则生产者阻塞、空则消费者阻塞,且所有操作线程安全。

什么是生产者消费者模型的核心约束
它不是一种具体代码写法,而是一组必须满足的并发约束:多个线程共享一个有限容量的缓冲区;生产者线程只负责往缓冲区put数据,消费者线程只负责从缓冲区take数据;缓冲区满时生产者必须阻塞或等待,空时消费者必须阻塞或等待;所有操作必须线程安全,不能出现数据错乱或重复消费。
用 BlockingQueue 实现最简可靠版本
Java 标准库的 BlockingQueue 接口(如 ArrayBlockingQueue、LinkedBlockingQueue)已封装了等待/唤醒、锁、容量控制等全部逻辑,是首选方案。
-
put()方法在队列满时自动阻塞,take()在空时自动阻塞,无需手动wait()/notify() - 构造时指定容量(如
new ArrayBlockingQueue),超出即触发阻塞,天然防止内存溢出(10) - 所有操作原子性由实现类保证,不用额外同步 —— 但注意:若需在
put/take前后加日志或状态更新,这部分仍需自行同步
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { this.queue = q; }
public void run() {
try {
for (int i = 0; i < 5; i++) {
String item = "msg-" + i;
System.out.println("Producing: " + item);
queue.put(item); // 自动阻塞
Thread.sleep(100);
}
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { this.queue = q; }
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
String item = queue.take(); // 自动阻塞
System.out.println("Consuming: " + item);
Thread.sleep(200);
}
} catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
}
手写 synchronized + wait/notify 的关键陷阱
仅用于理解原理,不推荐生产使用。最容易出错的是:wait() 被虚假唤醒、条件判断用 if 而非 while、notify() 误用为 notifyAll() 导致竞争失控。
- 必须用
while循环检查条件,不能用if—— 因为wait()返回后状态可能已变 -
notify()只唤醒单个等待线程,但无法保证唤醒的是对应角色(比如生产者调用notify()却唤醒了另一个生产者),应统一用notifyAll() - 所有对共享变量(如
queue.size()、queue.isEmpty())的读写,必须包裹在同一个synchronized块中,且锁对象一致
ReentrantLock + Condition 替代方案的适用场景
当需要更精细的控制(例如区分“生产者等待队列”和“消费者等待队列”,避免不必要的唤醒)时,用 ReentrantLock 配合两个 Condition 更合适。
立即学习“Java免费学习笔记(深入)”;
-
lock.newCondition()创建独立等待队列,生产者await()在notFull上,消费者await()在notEmpty上 -
notFull.signal()只唤醒等待空间的生产者,notEmpty.signal()只唤醒等待数据的消费者,减少无效调度 - 必须显式
lock.lock()/unlock(),且unlock()必须放在finally块里,否则极易死锁
真正难的不是写出来,而是想清楚:你是否真的需要这种粒度的控制?多数业务场景下,BlockingQueue 的语义足够清晰,出错率更低。











