实现无锁队列需用原子操作与内存序控制,C++中可借助std::atomic和CAS实现。1. 单生产者单消费者场景可用head和tail指针管理链表节点,生产者改tail,消费者改head,通过exchange更新指针。2. 多生产者时需用compare_exchange_weak循环重试确保线程安全,但高并发下可能性能下降。3. 内存序选用memory_order_acquire、release或acq_rel保证读写顺序,避免重排;ABA问题可通过版本号或带计数器指针解决。4. 数组实现的环形缓冲区适用于SPSC场景,利用模运算管理固定大小缓冲区,避免动态分配,提升效率。实际应用中应根据需求选择链表或数组方案,并优先使用成熟库以降低风险。

实现一个无锁队列(Lock-Free Queue)的关键是利用原子操作和内存顺序控制,避免使用互斥锁来保证线程安全。C++ 中可以通过 std::atomic 和 CAS(Compare-And-Swap)操作来构建高效的无锁单生产者单消费者或多个生产者的队列。
1. 基于链表的无锁队列(单生产者单消费者)
对于单生产者单消费者场景,可以使用简单的原子指针操作来实现高性能的无锁队列。
核心思路:- 用两个指针:head 和 tail,分别指向队列头和尾。
- 生产者只修改 tail,消费者只修改 head,减少竞争。
- 每个节点包含数据和下一个节点的指针。
代码示例:
#include#include template
class LockFreeQueue { private: struct Node { T data; std::atomic next; Node(const T& d) : data(d), next(nullptr) {} }; std::atomiczuojiankuohaophpcnNode*youjiankuohaophpcn head; std::atomiczuojiankuohaophpcnNode*youjiankuohaophpcn tail;public: LockFreeQueue() { Node* dummy = new Node(T{}); head.store(dummy, std::memory_order_relaxed); tail.store(dummy, std::memory_order_relaxed); }
~LockFreeQueue() { while (Node* h = head.load()) { head.store(h-youjiankuohaophpcnnext.load()); delete h; } } void push(const T& value) { Node* new_node = new Node(value); Node* old_tail = tail.exchange(new_node, std::memory_order_acq_rel); old_tail-youjiankuohaophpcnnext.store(new_node, std::memory_order_release); } bool pop(T& result) { Node* current_head = head.load(std::memory_order_acquire); Node* next_node = current_head-youjiankuohaophpcnnext.load(std::memory_order_acquire); if (next_node == nullptr) { return false; // 队列为空 } result = next_node-youjiankuohaophpcndata; head.store(next_node, std::memory_order_release); delete current_head; return true; }};
立即学习“C++免费学习笔记(深入)”;
2. 支持多生产者的无锁队列
当多个线程同时调用
push时,上面的实现可能出问题,因为tail.exchange()只能保证一个线程成功更新尾部。需要更复杂的 CAS 循环来确保正确性。改进 push 方法(多生产者安全):
void push(const T& value) { Node* new_node = new Node(value); Node* old_tail = tail.load();while (!tail.compare_exchange_weak(old_tail, new_node, std::memory_order_acq_rel)) { // 如果 tail 已被其他线程更新,则重试 } old_tail-youjiankuohaophpcnnext.store(new_node, std::memory_order_release);}
注意:这种方法在高并发下可能因大量 CAS 失败导致性能下降。
3. 内存顺序与 ABA 问题
内存顺序选择:
std::memory_order_acquire:用于读操作,确保之后的读写不会被重排到该操作之前。std::memory_order_release:用于写操作,确保之前的读写不会被重排到该操作之后。std::memory_order_acq_rel:同时具备 acquire 和 release 语义。ABA 问题:
当一个指针被释放并重新分配了相同地址的对象,CAS 操作无法察觉这种变化,可能导致逻辑错误。可通过引入“版本号”或使用
std::atomic_shared_ptr(非标准)缓解,或者手动封装带计数器的指针(如struct { Node* ptr; int version; })。4. 使用数组实现的无锁队列(环形缓冲区)
在单生产者单消费者场景中,基于数组的循环队列效率更高,且更容易避免动态内存分配。
templateclass RingBuffer { T buffer[Size]; std::atomic head {0}; // 生产者写入位置 std::atomic tail {0}; // 消费者读取位置 public: bool push(const T& item) { size_t current_head = head.load(); size_t next_head = (current_head + 1) % Size; if (next_head == tail.load()) { return false; // 队列满 } buffer[current_head] = item; head.store(next_head, std::memory_order_release); return true; }
bool pop(T& item) { size_t current_tail = tail.load(); if (current_tail == head.load()) { return false; // 队列空 } item = buffer[current_tail]; tail.store((current_tail + 1) % Size, std::memory_order_release); return true; }};
立即学习“C++免费学习笔记(深入)”;
注意:此版本适用于 SPSC(Single Producer Single Consumer),多生产者或多消费者需额外同步机制。
基本上就这些。根据实际需求选择链表还是数组实现,权衡通用性、性能和复杂度。无锁编程容易出错,建议充分测试并在关键路径上使用成熟的库(如
absl::IntrusiveList或folly::MPMCQueue)。











