线程安全队列通过互斥锁和条件变量实现,确保多线程环境下对队列的并发访问安全。使用std::mutex保护共享数据,std::condition_variable实现消费者等待机制,push操作入队并通知等待线程,wait_and_pop阻塞等待非空时出队,try_pop提供非阻塞出队方式,empty和size方法返回瞬时状态,适用于监控但不可用于逻辑控制。示例中生产者线程添加元素,消费者线程取出打印,避免忙等,提升效率。

在C++中实现一个线程安全的队列,通常需要结合互斥锁(std::mutex)和条件变量(std::condition_variable)来保护共享数据并实现线程间的高效同步。下面是一个基于标准库的线程安全队列的完整实现与说明。
线程安全队列的基本设计思路
多个线程可能同时对队列进行入队(push)和出队(pop)操作,因此必须保证对内部容器的访问是互斥的。同时,当队列为空时,消费者线程应等待新元素到来,而不是忙等。这正是条件变量发挥作用的地方。
核心组件包括:
- std::queue:作为底层容器存储元素。
- std::mutex:保护对队列的并发访问。
- std::condition_variable:用于阻塞消费者线程直到有数据可读。
代码实现:线程安全的队列模板类
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
template<typename T>
class ThreadSafeQueue {
private:
std::queue<T> data_queue;
mutable std::mutex mtx;
std::condition_variable cv;
public:
ThreadSafeQueue() = default;
void push(T value) {
std::lock_guard<std::mutex> lock(mtx);
data_queue.push(std::move(value));
cv.notify_one(); // 唤醒一个等待的消费者
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(mtx);
if (data_queue.empty()) {
return false;
}
value = std::move(data_queue.front());
data_queue.pop();
return true;
}
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [this] { return !data_queue.empty(); });
value = std::move(data_queue.front());
data_queue.pop();
}
bool empty() const {
std::lock_guard<std::mutex> lock(mtx);
return data_queue.empty();
}
size_t size() const {
std::lock_guard<std::mutex> lock(mtx);
return data_queue.size();
}
};
关键点解析
push 操作
立即学习“C++免费学习笔记(深入)”;
- 使用
std::lock_guard自动加锁,确保线程安全地将元素加入队列。 - 调用
notify_one()通知正在等待的消费者线程可以尝试取数据。
wait_and_pop 操作
- 使用
std::unique_lock配合cv.wait()实现阻塞等待。 - 条件变量的等待会自动释放锁,并在被唤醒后重新获取锁,避免忙等。
- 使用 lambda 判断队列非空,防止虚假唤醒。
try_pop 操作
- 非阻塞版本,适用于不想等待的场景。
- 返回布尔值表示是否成功取出元素。
empty 和 size 方法
- 虽然提供了这些方法,但在多线程环境下它们的返回值可能立即失效。
- 不应依赖
empty()的结果来判断是否调用wait_and_pop(),而应直接调用后者。
使用示例
#include <iostream>
void producer(ThreadSafeQueue<int>& queue) {
for (int i = 0; i < 5; ++i) {
queue.push(i);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void consumer(ThreadSafeQueue<int>& queue) {
for (int i = 0; i < 5; ++i) {
int value;
queue.wait_and_pop(value);
std::cout << "Consumed: " << value << '\n';
}
}
int main() {
ThreadSafeQueue<int> queue;
std::thread c(consumer, std::ref(queue));
std::thread p(producer, std::ref(queue));
p.join();
c.join();
return 0;
}
该示例展示了生产者线程向队列添加数字,消费者线程逐个取出并打印。由于使用了条件变量,消费者不会占用CPU资源空转。
基本上就这些。这个实现简单、高效,适合大多数多线程通信场景。注意不要过度暴露内部状态查询接口,重点是通过原子性的操作保证线程安全。











