C++造轮子飙车现场之无锁、有锁环形队列实现
创始人
2025-05-30 19:35:07
0

先看带锁的实现。

带锁版本

circular_queue.h

// 头文件防卫
#ifndef CIRCULAR_QUEUE_H
#define CIRCULAR_QUEUE_H#include  // 互斥量
#include  // 条件变量template 
class CircularQueue {
public:// 构造函数,初始化成员变量explicit CircularQueue(size_t capacity) :capacity_(capacity),size_(0),head_(0),tail_(0),buffer_(new T[capacity]) {}// 析构函数,释放 buffer_ 内存~CircularQueue() {delete[] buffer_;}// 判断队列是否为空bool empty() {std::unique_lock lock(mutex_);return size_ == 0;}// 判断队列是否已满bool full() {std::unique_lock lock(mutex_);return size_ == capacity_;}// 获取队列中元素的数量size_t size() {std::unique_lock lock(mutex_);return size_;}// 获取队列的容量size_t capacity() {return capacity_;}// 将元素加入队列,可能会阻塞bool push(const T& value, bool block = true) {std::unique_lock lock(mutex_);if (block) {// 如果队列已满,则等待队列不满while (size_ == capacity_) {not_full_.wait(lock);}} else {// 如果队列已满,则返回 falseif (size_ == capacity_) {return false;}}// 将元素加入队列尾部,并更新 tail_ 和 size_buffer_[tail_] = value;tail_ = (tail_ + 1) % capacity_;++size_;// 通知一个等待在 not_empty_ 条件变量上的线程not_empty_.notify_one();return true;}// 将元素加入队列,可能会阻塞,使用右值引用bool push(T&& value, bool block = true) {std::unique_lock lock(mutex_);if (block) {// 如果队列已满,则等待队列不满while (size_ == capacity_) {not_full_.wait(lock);}} else {// 如果队列已满,则返回 falseif (size_ == capacity_) {return false;}}// 将元素加入队列尾部,并更新 tail_ 和 size_buffer_[tail_] = std::move(value);tail_ = (tail_ + 1) % capacity_;++size_;// 通知一个等待在 not_empty_ 条件变量上的线程not_empty_.notify_one();return true;}// 从队列中取出元素,可能会阻塞bool pop(T& value, bool block = true) {std::unique_lock lock(mutex_);if (block) {// 如果队列为空,则等待队列不空while (size_ == 0) {not_empty_.wait(lock);}} else {// 如果队列为空,则返回 falseif (size_ == 0) {return false;}}// 取出队列头部元素,并更新 head_ 和 size_value = std::move(buffer_[head_]);head_ = (head_ + 1) % capacity_;--size_;// 通知一个等待在 not_full_ 条件变量上的线程not_full_.notify_one();return true;}private:const size_t capacity_; // 队列容量size_t size_; // 队列中元素的数量size_t head_; // 队列头部指针size_t tail_; // 队列尾部指针T* buffer_; // 队列缓冲区    std::mutex mutex_; // 互斥量,保护队列缓冲区和队列大小std::condition_variable not_full_; // 条件变量,当队列满时等待std::condition_variable not_empty_; // 条件变量,当队列空时等待
};#endif // CIRCULAR_QUEUE_H

push和pop接口不指定第二个参数的话,默认是阻塞的,这一点使用时需要注意。

以下是CircularQueue类的单元测试示例代码:

#include 
#include #include "circular_queue.h"TEST(CircularQueueTest, EmptyQueue) {CircularQueue queue(10);ASSERT_TRUE(queue.empty());ASSERT_FALSE(queue.full());ASSERT_EQ(queue.size(), 0);ASSERT_EQ(queue.capacity(), 10);
}TEST(CircularQueueTest, PushAndPop) {CircularQueue queue(3);ASSERT_TRUE(queue.push(1));ASSERT_EQ(queue.size(), 1);ASSERT_FALSE(queue.empty());ASSERT_FALSE(queue.full());ASSERT_TRUE(queue.push(2));ASSERT_EQ(queue.size(), 2);ASSERT_FALSE(queue.empty());ASSERT_FALSE(queue.full());ASSERT_TRUE(queue.push(3));ASSERT_EQ(queue.size(), 3);ASSERT_FALSE(queue.empty());ASSERT_TRUE(queue.full());int value;ASSERT_TRUE(queue.pop(value));ASSERT_EQ(value, 1);ASSERT_EQ(queue.size(), 2);ASSERT_FALSE(queue.empty());ASSERT_FALSE(queue.full());ASSERT_TRUE(queue.pop(value));ASSERT_EQ(value, 2);ASSERT_EQ(queue.size(), 1);ASSERT_FALSE(queue.empty());ASSERT_FALSE(queue.full());ASSERT_TRUE(queue.pop(value));ASSERT_EQ(value, 3);ASSERT_EQ(queue.size(), 0);ASSERT_TRUE(queue.empty());ASSERT_FALSE(queue.full());ASSERT_FALSE(queue.pop(value,false));
}TEST(CircularQueueTest, PushAndPopWithBlocking) {CircularQueue queue(2);std::thread t([&queue]() {int value = 0;queue.pop(value);ASSERT_EQ(value, 1);queue.pop(value);ASSERT_EQ(value, 2);});ASSERT_TRUE(queue.push(1));ASSERT_TRUE(queue.push(2));ASSERT_TRUE(queue.push(3));t.join();
}TEST(CircularQueueTest, PushAndPopWithNonBlocking) {CircularQueue queue(2);int value;ASSERT_TRUE(queue.push(1));ASSERT_TRUE(queue.push(2));ASSERT_FALSE(queue.push(3, false));ASSERT_TRUE(queue.pop(value));ASSERT_EQ(value, 1);ASSERT_TRUE(queue.pop(value));ASSERT_EQ(value, 2);ASSERT_FALSE(queue.pop(value, false));
}TEST(CircularQueueTest, MovePushAndPop) {CircularQueue queue(3);ASSERT_TRUE(queue.push("hello"));ASSERT_TRUE(queue.push("world"));ASSERT_EQ(queue.size(), 2);std::string value;ASSERT_TRUE(queue.pop(value));ASSERT_EQ(value, "hello");ASSERT_EQ(queue.size(), 1);ASSERT_TRUE(queue.push("foo"));ASSERT_EQ(queue.size(), 2);ASSERT_TRUE(queue.pop(value));ASSERT_EQ(value, "world");ASSERT_EQ(queue.size(), 1);ASSERT_TRUE(queue.pop(value));ASSERT_EQ(value, "foo");ASSERT_EQ(queue.size(), 0);
}TEST(CircularQueueTest, CopyPushAndPop) {CircularQueue queue(3);ASSERT_TRUE(queue.push(std::string("hello")));ASSERT_TRUE(queue.push(std::string("world")));ASSERT_EQ(queue.size(), 2);std::string value;ASSERT_TRUE(queue.pop(value));ASSERT_EQ(value, "hello");ASSERT_EQ(queue.size(), 1);ASSERT_TRUE(queue.push(std::string("foo")));ASSERT_EQ(queue.size(), 2);ASSERT_TRUE(queue.pop(value));ASSERT_EQ(value, "world");ASSERT_EQ(queue.size(), 1);ASSERT_TRUE(queue.pop(value));ASSERT_EQ(value, "foo");ASSERT_EQ(queue.size(), 0);
}TEST(CircularQueueTest, MultiThreadPushPop) {const int num_threads = 4;const int num_iterations = 10000;const int queue_size = 100;CircularQueue queue(queue_size);std::vector threads;for (int i = 0; i < num_threads; ++i) {threads.emplace_back([&queue, num_iterations]() {for (int j = 0; j < num_iterations; ++j) {queue.push(j);}});}for (int i = 0; i < num_threads; ++i) {threads.emplace_back([&queue, num_iterations]() {for (int j = 0; j < num_iterations; ++j) {int value;queue.pop(value);}});}for (auto& thread : threads) {thread.join();}ASSERT_EQ(queue.size(), 0);
}int main(int argc, char** argv) {testing::InitGoogleTest(&argc, argv);return RUN_ALL_TESTS();
}

单元测试运行结果:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CqCh8DqE-1679157650605)(C:\Users\小静静\AppData\Roaming\Typora\typora-user-images\image-20230319001514719.png)]

无锁版本

上面的循环队列使用锁保证了线程安全。

以下是一个基于C++11的线程安全且无锁的环形队列实现,支持阻塞读和非阻塞读,写也一样。

#include 
#include 
#include 
#include 
#include 
#include template 
class RingQueue {
public:RingQueue() : read_idx_(0), write_idx_(0), data_{} {}bool Push(const T& item, bool block = false) { return PushImpl(item, block); }bool Push(T&& item, bool block = false) { return PushImpl(std::move(item), block); }bool Pop(T& item, bool block = false) {// 只有一个读者,read_idx_的读取可以不加锁size_t current_read_idx = read_idx_.load(std::memory_order_relaxed);// 保证读到write_idx_的变化,此处memory_order_acquire发挥的是可见性问题while (current_read_idx == write_idx_.load(std::memory_order_acquire)) {if (!block) {return false;}std::this_thread::yield();}item = std::move(data_[current_read_idx]); // 必须先把数据读走read_idx_才能+1,memory_order_release保证了对item的写不会被重排到read_idx_ + 1之后read_idx_.store(Next(current_read_idx), std::memory_order_release);return true;}template bool Pop(Func&& func, bool block = false) {size_t current_read_idx = read_idx_.load(std::memory_order_relaxed);while (current_read_idx == write_idx_.load(std::memory_order_acquire)) {if (!block) {return false;}std::this_thread::yield();}T item = std::move(data_[current_read_idx]);read_idx_.store(Next(current_read_idx), std::memory_order_release);func(std::move(item));return true;}void PopAsync(const T& value, std::function callback) {auto task = [this, value, callback]() {bool result = Pop(value, true);callback(result);};std::thread(std::move(task)).detach();}    bool IsEmpty() const {return read_idx_.load(std::memory_order_acquire) ==write_idx_.load(std::memory_order_acquire);}bool IsFull() const {return Next(write_idx_.load(std::memory_order_acquire)) ==read_idx_.load(std::memory_order_acquire);}private:template bool PushImpl(Item&& item, bool block = false) {// 只有1个写线程, 所以write_idx_可以不加锁size_t current_write_idx = write_idx_.load(std::memory_order_relaxed);size_t next_write_idx = Next(current_write_idx);// 读线程会修改read_idx_, 所以此处需要保证看到read_idx_的变化,此处memory_order_acquire保证的是可见性问题while (next_write_idx == read_idx_.load(std::memory_order_acquire)) {if(!block) {return false;}std::this_thread::yield();}// 数据的写入必须在write_idx_+1之前data_[current_write_idx] = std::forward(item);// 保证之前的写操作对读线程可见,即读线程能立刻看到data_刚写入的数据,当然也包括write_idx_的+1变化,memory_order_release会保证对data_的写入在write_idx_+1的操作之前完成。// 因为就算data_的赋值语句放在write_idx_+1之前,由于编译器或者运行期指令重排,并不一定能保证data_赋值语句就一定在write_idx_+1前执行。write_idx_.store(next_write_idx, std::memory_order_release);return true;}size_t Next(size_t current_idx) const { return (current_idx + 1) % (N+1); } // 此处笔者做了修改,N改成N+1std::atomic read_idx_;std::atomic write_idx_;std::array data_;  // 此处笔者做了修改,N改成N+1
};

代码中使用了模板参数T和N,支持了不同数据类型和不同队列大小的选择,借助读写指针两个原子变量实现无锁环形队列。

但需要注意的是,这个只能实现一读一写的线程安全,存在多个读者或者多个写者时就线程不安全了。

无锁编程的难点在于对几个内存时序的理解。

补充下关于内存时序操作的解释。

C++定义了几种内存时序,这些时序规定了原子变量前后的所有内存操作(包括普通变量、原子变量)如何排序

std::memory_order_relaxed只保正操作的原子性,对于同一个原子变量的多个操作之间不存在任何内存序的限制,也就是说,它们可以随意重排序,也可以在任意时刻被其他线程的操作所干扰。因此,使用std::memory_order_relaxed时需要特别小心,必须确保操作的正确性不受此种松散的内存访问顺序的影响。

std::memory_order_relaxed主要用于那些不需要任何同步机制的场合,比如计数器的自增、自减等操作,这些操作只需要保证结果的正确性,而不需要保证其执行的顺序。因此,std::memory_order_relaxed是最快的内存序,但也是最危险的一种内存序。


std::memory_order_acquire确保所有之前的读操作都已经完成,然后再执行当前读取操作。这意味着,如果当前读取操作需要用到之前的读取操作的结果,那么它将能够正确地获取到这些结果。

具体来说,当使用memory_order_acquire语义时,编译器和处理器都会保证当前线程所在的CPU核心(或处理器)在执行当前原子操作之前,会先将所有之前的读操作所获得的数据从CPU缓存中刷新到主内存中,以保证当前线程能够读取到其他线程对共享变量的最新修改。

使用memory_order_acquire语义可以保证程序的正确性,避免出现数据竞争的问题。但是,使用memory_order_acquire语义可能会降低程序的性能,因为它要求在执行原子操作之前,必须将所有之前的读操作都刷新到主内存中,这可能会导致缓存一致性协议的开销增加。因此,在实际编程中,应该根据具体情况选择合适的内存序语义。


std::memory_order_release确保当前线程的所有写操作在该原子操作之前都已经完成,并且将这些写操作对其他线程可见。这样,其他线程就可以看到当前线程对共享数据所做的更改。

这种释放操作通常用于同步操作,例如将一个共享变量的值更新后通知其他线程。在这种情况下,std::memory_order_release可以确保其他线程能够看到更新后的值。

Push提供了插入左值和右值两个版本,提高C++所谓的一点点性能,PushImpl提取了公共代码,实现代码复用,优雅!。

Pop提供了两个版本,有个变体,不返回pop出来的对象,而是调用外部传入的回调,对pop出来的对象进行操作。

同样附上单元测试。

#include 
#include "RingQueue.h"class RingQueueSingleThreadTest : public ::testing::Test {
protected:RingQueue queue_;
};TEST_F(RingQueueSingleThreadTest, PushAndPop) {int value = 0;EXPECT_FALSE(queue_.Pop(value));EXPECT_TRUE(queue_.Push(1));EXPECT_FALSE(queue_.IsEmpty());EXPECT_TRUE(queue_.Pop(value));EXPECT_EQ(value, 1);EXPECT_TRUE(queue_.IsEmpty());
}TEST_F(RingQueueSingleThreadTest, PushAndPopWithBlock) {int value = 0;std::thread t([&](){std::this_thread::sleep_for(std::chrono::milliseconds(100));queue_.Push(1, true);});EXPECT_TRUE(queue_.Pop(value, true));EXPECT_EQ(value, 1);t.join();
}TEST_F(RingQueueSingleThreadTest, PushAndPopWithFunc) {int value = 0;queue_.Push(1);queue_.Pop([&](int v){ value = v + 1; });EXPECT_EQ(value, 2);
}TEST_F(RingQueueSingleThreadTest, IsEmptyAndIsFull) {EXPECT_TRUE(queue_.IsEmpty());EXPECT_FALSE(queue_.IsFull());for (int i = 0; i < 10; ++i) {EXPECT_TRUE(queue_.Push(i));}EXPECT_TRUE(queue_.IsFull());EXPECT_FALSE(queue_.IsEmpty());int value = 0;EXPECT_FALSE(queue_.Push(10));EXPECT_TRUE(queue_.Pop(value));EXPECT_FALSE(queue_.IsFull());
}class RingQueueMultiThreadTest : public testing::Test {
protected:virtual void SetUp() {// 初始化数据for (int i = 0; i < 1000; ++i) {data_.push_back("data_" + std::to_string(i));}}std::vector data_;
};TEST_F(RingQueueMultiThreadTest, MultiThreadTest) {RingQueue queue;// 写线程std::thread writer([&queue, this]() {for (const auto& item : data_) {queue.Push(item, true);}});// 读线程std::thread reader([&queue, this]() {int count = 0;std::string item;while (count < 1000) {if (queue.Pop(item, true)) {EXPECT_EQ(item, "data_" + std::to_string(count));++count;} else {std::this_thread::yield();}}});writer.join();reader.join();
}int main(int argc, char** argv) {::testing::InitGoogleTest(&argc, argv);return RUN_ALL_TESTS();
}

在这里插入图片描述

相关内容

热门资讯

王凤英入职小鹏3年终获股权,此... 5月7日消息,小鹏汽车披露的监管及年报信息显示,公司总裁王凤英已正式进入股东名册,入职小鹏3年后股权...
五块钱红酒卖断货,便宜红酒为何... 最近一段时间,中国的酒类消费市场可以说是显得格外奇怪,一方面,各种高端酒特别是白酒的消费量出现了明显...
财联社C50风向指数调查:4月... 财联社5月8日讯(记者 夏淑媛)新一期财联社“C50风向指数”结果显示,市场机构对4月新增人民币贷款...
央视硬刚国际足联拒掏20亿,背... 作者| 史大郎&猫哥 来源| 是史大郎&大猫财经Pro 央视这次太刚了,离世界杯开幕还有1个月,死活...
新CEO上任直接放大招!Air... 快科技5月8日消息,苹果即将上任的CEO John Ternus对未来一系列新产品充满信心,称这些设...
“特朗普拟邀英伟达、波音等CE... 据路透社当地时间5月7日报道,特朗普政府正邀请英伟达、苹果、埃克森美孚、波音等大公司首席执行官,于下...
世界杯,还能看到直播吗? 2026年美加墨世界杯距离开幕,仅剩一个多月时间。多方信息显示,中央广播电视总台(以下简称“央视”)...
机构警告AI芯片热潮风险,超威... 5月7日,据央视财经,隔夜超威半导体公司(AMD)股价飙升近19%,带动AI芯片热潮持续升温。AMD...
银行员工转走储户1800万最新... 银行员工转走储户1800万最新进展:2名储户已收到银行全部款项
原创 中... 1994年,安徽省的经济格局曾发生过一次戏剧性的转折。在那一年,一座名为安庆的城市,其国内生产总值(...
昆都仑区:政策“蓄力”消费焕新 “一台5000多元的空调,叠加‘国补’和商场的以旧换新活动,能优惠1000元左右,旧机还能免费上门拆...
乐悦置业竞得佛山顺德乐从镇一商... 观点网讯:5月6日,佛山市顺德区乐从镇一商业地块成功出让,由广东省乐悦置业有限公司竞得,乐从南区·邻...
原创 亦... 《爱情没有神话》这部剧,一开始的命运颇为多舛,经历了几次撤档的波折后,终于在观众面前亮相,但其首播的...
美联储34年最大分歧叠加油价飙... 美联储按预期维持利率不变,但内部出现34年来最严重分歧,叠加布油创2022年6月以来新高,美债遭抛售...
支付宝消费券回收后,资金是否支... 摘要: 支付宝消费券回收变现后,资金能否直接转入信用卡?本文解答到账方式的相关规则,帮助用户了解资金...
中医介绍5个化痰穴位!收藏这篇... 很多人忽略了“痰”的危害,觉得咳几下就没事,殊不知,肺里的痰长期堆积,只会一步步加重身体负担。 中医...
黄金平台“杰我睿”涉嫌经济犯罪... 红星资本局5月7日消息,深圳水贝知名金店“杰我睿”兑付困难事件有了新进展。日前,深圳市公安局罗湖分局...
多地出台购房新政促楼市升温 记... 今年的“五一”假期,伴随着多个城市楼市新政密集落地,在叠加市场信心持续修复的作用下,房地产市场热度持...
谁是五一“吸金王”?这5座城市... 来源:市场资讯 (来源:21城市观) 哪座城市成为“五一”假期的大赢家? 图源:摄图网 作者|赵晓...
“低招低裁”格局稳固劳动力市场... 智通财经APP获悉,美国上周初请失业金人数在经历前一周回落至近几十年来最低水平后出现小幅反弹,表明尽...