线程安全队列通过互斥锁和条件变量实现,确保多线程环境下入队、出队操作的安全性与阻塞等待机制,满足生产者-消费者模型需求。

在多线程编程中,多个线程同时访问共享数据结构时容易引发竞争条件。队列作为常见的数据结构,在任务调度、生产者-消费者模型中广泛使用,因此实现一个线程安全的队列非常关键。C++ 提供了标准库中的 std::queue 和同步机制,结合互斥锁(std::mutex)和条件变量(std::condition_variable),我们可以设计一个高效且线程安全的队列。
线程安全队列的基本需求
一个线程安全的队列需要满足以下几点:
- 入队(push)操作线程安全:多个线程可以安全地向队列添加元素。
- 出队(pop)操作线程安全:多个线程可以从队列中取出元素,避免数据竞争。
- 阻塞等待机制:当队列为空时,消费线程应能阻塞等待,直到有新元素加入。
- 异常安全:操作过程中抛出异常不应导致死锁或资源泄漏。
使用互斥锁和条件变量实现
下面是一个基于 std::queue、std::mutex 和 std::condition_variable 的线程安全队列实现:
立即学习“C++免费学习笔记(深入)”;
#include <queue>
#include <mutex>
#include <condition_variable>
<p>template<typename T>
class ThreadSafeQueue {
private:
std::queue<T> data_queue;
mutable std::mutex mtx;
std::condition_variable cv;</p><p>public:
ThreadSafeQueue() = default;</p><pre class='brush:php;toolbar:false;'>void push(T value) {
std::lock_guard<std::mutex> lock(mtx);
data_queue.push(std::move(value));
cv.notify_one(); // 唤醒一个等待的消费者
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(mtx);
if (data_queue.empty()) {
return false;
}
value = std::move(data_queue.front());
data_queue.pop();
return true;
}
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [this] { return !data_queue.empty(); });
value = std::move(data_queue.front());
data_queue.pop();
}
bool empty() const {
std::lock_guard<std::mutex> lock(mtx);
return data_queue.empty();
}
size_t size() const {
std::lock_guard<std::mutex> lock(mtx);
return data_queue.size();
}};
关键点说明
push 操作:使用 std::lock_guard 自动加锁,确保插入操作的原子性。插入后调用 notify_one() 唤醒一个等待的消费者线程。
try_pop 与 wait_and_pop 区别:
-
try_pop非阻塞,立即返回是否成功获取元素。 -
wait_and_pop会阻塞,直到队列非空,适合生产者-消费者场景。
条件变量的使用:在 wait_and_pop 中,cv.wait() 会自动释放锁并等待,当被唤醒时重新获取锁,并检查谓词 !data_queue.empty(),防止虚假唤醒。
mutable mutex:将互斥量声明为 mutable,是为了在 empty() 和 size() 这类 const 成员函数中也能加锁。
使用示例:生产者-消费者模型
#include <iostream>
#include <thread>
#include <vector>
<p>int main() {
ThreadSafeQueue<int> queue;
std::vector<std::thread> producers;
std::vector<std::thread> consumers;</p><pre class='brush:php;toolbar:false;'>// 启动消费者线程
for (int i = 0; i < 3; ++i) {
consumers.emplace_back([&queue]() {
for (int j = 0; j < 5; ++j) {
int value;
queue.wait_and_pop(value);
std::cout << "Consumed: " << value << "\n";
}
});
}
// 启动生产者线程
for (int i = 0; i < 2; ++i) {
producers.emplace_back([&queue, i]() {
for (int j = 0; j < 8; ++j) {
queue.push(i * 10 + j);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
});
}
// 等待所有线程完成
for (auto& t : producers) t.join();
for (auto& t : consumers) t.join();
return 0;}
该示例展示了多个生产者向队列推送数据,多个消费者从队列取出并处理数据的过程。由于队列是线程安全的,整个过程无需额外同步控制。
基本上就这些。这个实现简单、清晰,适用于大多数多线程场景。如果对性能要求更高,还可以考虑无锁队列(lock-free queue),但实现复杂度显著上升。对于一般应用,基于锁的线程安全队列已经足够可靠。











