全部版块 我的主页
论坛 数据科学与人工智能 数据分析与数据科学 python论坛
124 0
2025-11-18

在高流量场景下,你是否遇到过这样的难题:使用互斥锁保护共享数据,却因线程频繁等待导致性能下降?例如高频交易系统的订单处理、服务器的请求队列,锁竞争导致的上下文切换成本,常常成为压垮性能的最后一根稻草。而无锁数据结构,正是解决这一问题的关键方案——它基于 C++ 原子操作,无需依赖传统的锁机制,即可实现线程安全的并发访问,使高并发代码真正提速。

你将学习如何用 std::atomic 封装原子指针、如何通过循环 CAS 操作解决并发冲突、如何利用内存序确保数据一致性,还将避免 ABA 问题、内存泄漏等常见陷阱。无论你是希望优化现有项目的并发性能,还是想深入了解原子操作的实际应用,这篇文章都能为你提供明确的指导。跟随实战步骤逐步推进,你会发现,无锁编程不仅能在高流量场景下显著提升性能,还能加深你对 C++ 并发编程的理解。

一、原子操作是什么?

原子操作,顾名思义,就像化学中的原子一样,是不可分割的基本单位。在计算机科学中,特别是在多线程编程的背景下,原子操作指的是那些在执行过程中不会被线程调度机制中断的操作。这意味着,一个原子操作要么完全执行,要么根本不执行,不存在中途被其他线程干扰的情况。

举个例子,你去银行办理转账业务,从账户 A 转一笔钱到账户 B。这个转账操作包括从账户 A 扣除金额和向账户 B 增加金额两个步骤。如果这是一个原子操作,那么这两个步骤要么一起成功,要么因为某些原因(如账户余额不足)一起失败。绝不会出现从账户 A 扣了钱,但钱没有转到账户 B 的尴尬情况。

在 C++ 中,原子操作通常由硬件指令直接支持,这使得它们在多线程环境中能高效地保证数据的一致性。例如,对于简单的变量赋值操作,如果使用原子操作,就可以确保在多个线程同时尝试赋值时,不会发生数据混乱的情况。像 std::atomic<int> a = 5; 这样的声明,就创建了一个原子整数变量 a,对它的操作都是原子的。

在多线程或多核环境中,若操作不具备原子性,可能会引发以下问题:

  • 竞态条件(Race Condition):多个线程同时修改共享数据,导致结果不可预测。
  • 数据不一致:如银行转账操作中,扣款和入账若被中断,可能导致金额错误。
  • 逻辑错误:如计数器递增操作被中断,导致计数丢失。

二、C++ 中的原子操作

2.1 std::atomic 类模板

C++11 的引入,为多线程编程带来了一个强大的工具——std::atomic 类模板。它像一个坚固的堡垒,为基本数据类型的原子操作提供了坚实的保障。这个类模板的设计理念非常直观,使开发者能够轻松地将原子操作集成到代码中。

使用 std::atomic 类模板非常简单,就像创建一个普通变量一样,只是需要在类型前加上 std::atomic。例如,要创建一个原子整数变量,可以这样写:std::atomic<int> atomicInt;。这就声明了一个名为 atomicInt 的原子整数,它的所有操作都是原子的,不会被其他线程打断。

下面通过一个简单的代码示例来展示 std::atomic 的基本用法:

#include?<iostream>
#include?<atomic>
#include?<thread>

std::atomic<int> sharedAtomicVariable(0);

void atomicIncrement() {
? ? for (int i = 0; i < 10000; ++i) {
? ? ? ? sharedAtomicVariable++;
? ? }
}

int main() {
? ? std::thread thread1(atomicIncrement);
? ? std::thread thread2(atomicIncrement);

? ? thread1.join();
? ? thread2.join();

? ? std::cout << "Final value of sharedAtomicVariable: " << sharedAtomicVariable << std::endl;
? ? return 0;
}

在这个示例中,sharedAtomicVariable 是一个原子整数变量。atomicIncrement 函数对它进行 10000 次递增操作。由于 sharedAtomicVariable 是原子的,所以在多线程环境下,它的递增操作不会出现数据竞争的问题。最终运行结果,sharedAtomicVariable 的值一定是 20000,这与前面非原子操作的例子形成了鲜明对比。

std::atomic 类模板还提供了一系列丰富的成员函数,用于实现各种原子操作。例如 fetch_add 函数,它可以原子地增加变量的值,并返回增加前的值;store 函数用于原子地存储一个新值;load 函数则用于原子地加载当前值。这些函数为开发者提供了更多的控制灵活性,以满足不同场景的需求。例如:

std::atomic<int> num(5);
int oldValue = num.fetch_add(3, std::memory_order_relaxed);
std::cout << "Old value: " << oldValue << ", New value: " << num << std::endl;?

这段代码中,fetch_add 函数将 num 的值增加 3,并返回原来的值 5,最终 num 的值变为 8。通过这些成员函数,我们可以根据具体的业务逻辑,选择最合适的原子操作方式。

2.2 原子操作的内存模型

在 C++ 的原子操作领域,内存模型是一个至关重要的复杂概念,它像一个无形的指挥家,控制着原子操作在内存中的可见性和顺序,对程序的正确性和性能有着深远的影响。C++ 定义了多种内存模型,每一种都有其独特的作用和适用场景。

首先是memory_order_relaxed,它是最宽泛的内存模型。在这种模型下,原子操作仅确保自身的原子性,不保障操作的内存顺序。这意味着,编译器和处理器可以依据自身的优化策略,自由地对指令进行重新排序。尽管这种模型在某些特定场合下能提升性能,但因缺乏对内存顺序的掌控,可能导致一些难以排查的问题。例如:
std::atomic<int> a(0);
std::atomic<bool> flag(false);

void threadFunction1() {
? ? a.store(1, std::memory_order_relaxed);
? ? flag.store(true, std::memory_order_relaxed);
}

void threadFunction2() {
? ? while (!flag.load(std::memory_order_relaxed));
? ? std::cout << "a: " << a.load(std::memory_order_relaxed) << std::endl;
}
在这个示例中,由于memory_order_relaxed的宽泛性,threadFunction1中a.store(1)和flag.store(true)的执行顺序可能会被重新排列。如果重新排列发生,threadFunction2可能在a还未被设置为 1 时就读取a的值,从而产生错误的结果。 与memory_order_relaxed不同,memory_order_acquire和memory_order_release则提供了更强的内存可见性保障。memory_order_acquire确保在此操作之前的所有内存读取都对其他线程可见,就像一道关卡,在它之后的操作不会被重排到它之前;而memory_order_release确保在此操作之后的所有内存写入都对其他线程可见,类似于一个屏障,在它之前的操作不会被重排到它之后。这两种内存模型常一起使用,以实现线程间的同步。 例如经典的生产者 - 消费者模型:
std::atomic<int> data(0);
std::atomic<bool> ready(false);

void producer() {
? ? data.store(42, std::memory_order_release);
? ? ready.store(true, std::memory_order_release);
}

void consumer() {
? ? while (!ready.load(std::memory_order_acquire));
? ? std::cout << "Data: " << data.load(std::memory_order_acquire) << std::endl;
}
在这个模型中,生产者线程在生产数据后,使用memory_order_release存储ready为true,确保数据写入对消费者线程可见;消费者线程在读取数据前,使用memory_order_acquire等待ready为true,确保能获取到最新的数据。 memory_order_acq_rel则同时具备memory_order_acquire和memory_order_release的特点,适用于那些既需读取又需写入的复杂场景。还有memory_order_seq_cst,它是默认的内存模型,提供最严格的顺序一致性保障,所有线程都能看到一致的操作顺序,但相应地,它的性能成本也相对较高。 选择适当的内存模型对于确保程序的正确性和性能至关重要。在实际编程中,我们需要根据具体的应用逻辑和性能需求,谨慎评估选择。如果对性能有极高要求,且数据一致性要求相对较低,可以考虑使用memory_order_relaxed;而对于需要严格保障数据一致性和线程同步的场景,则应选择更严格的内存模型。

2.3 原子操作的性能考量

原子操作虽然为多线程编程提供了强大的线程安全保证,但就像任何技术一样,它并非完美无瑕,在享受其带来的便利的同时,我们也需要关注它所带来的性能开销。原子操作的性能开销是由多种因素共同作用产生的,深入理解这些因素,有助于我们在编写高效的多线程程序时做出更明智的选择。 从硬件层面来看,原子操作依赖于特定的硬件指令,如比较并交换(CAS,Compare-And-Swap)指令。这些指令的执行通常比普通的内存操作成本更高,因为它们需要在多个处理器核心之间进行复杂的同步操作,以确保原子性。例如,在一个多核处理器系统中,当一个核心执行原子操作时,需要通过缓存一致性协议通知其他核心,使它们能够及时更新缓存中的数据,这一过程会带来额外的通信和同步成本。 缓存一致性也是影响原子操作性能的重要因素。为了确保数据的一致性,原子操作需要通过缓存一致性协议来同步不同核心之间的数据。当一个核心对原子变量进行修改时,不仅要更新自己的缓存,还需要通知其他核心的缓存,使其失效或更新。这就像是一场跨核心的信息传递接力赛,每次传递都需要消耗时间和资源,特别是在多核环境中,频繁的缓存同步会显著增加性能成本。 编译器优化在原子操作中也起着重要作用。编译器在生成原子操作的机器代码时,会进行一系列优化,尝试减少性能开销。但这些优化并不总能达到预期效果,特别是在复杂的场景下。例如,在某些情况下,编译器可能无法准确判断原子操作的依赖关系,导致优化过度或不足,从而影响程序性能。 为了减少原子操作的性能开销,我们可以采用一些有效的策略。首先,应尽量减少原子操作的使用。在不需要原子操作的场景下,避免不必要的原子变量声明和操作。比如,对于一些局部变量,它们不会被多个线程共享,就没有必要使用原子类型,使用普通变量即可,这样可以避免原子操作带来的额外开销。 选择适当的内存模型也是关键。正如前面提到的,不同的内存模型具有不同的性能特征。根据实际需求选择适当的内存模型,避免过度同步。如果程序对内存顺序的要求不高,可以选择像memory_order_relaxed这样的宽泛内存模型,以减少不必要的同步开销;而对于需要严格保障内存顺序的场景,再选择更严格的内存模型。

采用无锁数据结构是另一种高效的方法。在可行的情况下,用无锁数据结构替代传统的锁机制。无锁数据结构通过原子操作实现线程安全,避免了锁导致的上下文切换和竞争成本。例如,无锁链表、无锁队列等,它们在高并发环境中能显著提升性能。但需注意,无锁数据结构的实现通常较为复杂,需要开发者具备一定的技能和经验。

三、原子操作应用场景

3.1 计数器和标志位

在多线程编程领域,计数器和标志位如同交通信号灯,起到关键的同步作用。它们能够协调各线程的执行顺序,确保数据的精确性和程序的稳定。原子操作则为这些计数器和标志位提供了可靠的保障,使其在多线程环境中高效、安全地运行。

以一个简单的线程等待初始化操作为例,假设我们有一个复杂的系统,其中多个工作线程需要依赖某个资源的初始化。在这种情况下,可以使用原子标志位来实现线程间的同步。代码如下:

#include?<iostream>
#include?<thread>
#include?<atomic>

std::atomic<bool> initialized(false);

void initThread() {
? ? // 模拟初始化操作,可能包含复杂的计算或资源加载
? ? std::this_thread::sleep_for(std::chrono::seconds(2));
? ? initialized.store(true, std::memory_order_release);
}

void workerThread() {
? ? while (!initialized.load(std::memory_order_acquire)) {
? ? ? ? // 线程等待初始化完成,这里可以使用std::this_thread::yield()让出CPU时间片
? ? ? ? std::this_thread::yield();
? ? }
? ? std::cout << "Worker thread is starting to work." << std::endl;
? ? // 执行实际的工作任务
}

int main() {
? ? std::thread init(initThread);
? ? std::thread worker(workerThread);

? ? init.join();
? ? worker.join();

? ? return 0;
}

在这段代码中,initialized是一个原子布尔变量,初始状态为false,表示资源尚未初始化。initThread函数负责执行初始化操作,完成后将initialized设为true,并使用std::memory_order_release内存模型,确保所有之前的内存写入操作对其他线程可见。workerThread函数则在一个循环中不断检查initialized的值,只要它为false,就调用std::this_thread::yield()让出CPU时间片,避免不必要的CPU消耗。当initialized变为true时,工作线程知道资源已初始化完成,可以开始执行实际的任务。

这种使用原子标志位的方法,相比传统的锁机制,具有更高的效率和更低的成本。因为它避免了锁的加锁和解锁操作,减少了线程上下文切换的次数,从而提升了程序的整体性能。而且,原子操作的原子性保证了标志位的读写操作不会被其他线程中断,避免了数据竞争和不一致的问题。在实际应用中,计数器和标志位的原子操作被广泛应用于各种多线程场景,如资源管理和任务调度,是多线程编程中不可或缺的工具。

3.2 条件变量和事件

在多线程编程的复杂交响乐中,条件变量和事件像是精心编排的乐章,用于协调线程之间的执行顺序,确保各线程在合适的时间协同工作。原子操作在实现类似条件变量的功能时,发挥着关键作用,为线程间的高效同步提供了有力支持。

条件变量通常用于线程间的协调,允许一个或多个线程等待某个条件的发生,当条件满足时,相应的线程会被唤醒。在某些情况下,可以巧妙地使用原子操作来实现类似条件变量的功能。下面通过一个简单的示例代码来深入理解:

#include?<iostream>
#include?<thread>
#include?<atomic>
#include?<chrono::system_clock>

std::atomic<bool> ready(false);
std::atomic<int> data(0);

void producer() {
? ? // 模拟一些准备工作
? ? std::this_thread::sleep_for(std::chrono::seconds(2));
? ? data.store(42, std::memory_order_release);
? ? ready.store(true, std::memory_order_release);
}

void consumer() {
? ? while (!ready.load(std::memory_order_acquire)) {
? ? ? ? std::this_thread::yield();
? ? }
? ? std::cout << "Consumer got data: " << data.load(std::memory_order_acquire) << std::endl;
}

int main() {
? ? std::thread producerThread(producer);
? ? std::thread consumerThread(consumer);

? ? producerThread.join();
? ? consumerThread.join();

? ? return 0;
}

在这个示例中,ready是一个原子变量,充当了类似条件变量的角色,用于指示资源是否已准备好。producer线程负责准备数据,在完成准备工作后,将数据存储到data变量中,并将ready设为true,使用std::memory_order_release内存模型确保数据写入对其他线程可见。consumer线程则在一个循环中不断检查ready的值,只要它为false,就调用std::this_thread::yield()让出CPU时间片,避免不必要的CPU消耗。当ready变为true时,消费者线程知道数据已准备好,可以安全地读取data变量的值。

通过这种方式,原子操作实现了与条件变量类似的功能,即线程间的同步和等待。虽然这种方法相对简单,但在某些特定场景下,它可以提供比传统条件变量更高效的解决方案。因为原子操作避免了条件变量通常需要的锁机制,减少了线程上下文切换和锁竞争的成本,从而提高了程序的性能。当然,原子操作实现的条件变量功能相对有限,对于复杂的条件判断和多线程协作场景,传统的条件变量仍然是更好的选择。但在一些简单的同步需求下,原子操作的简洁性和高效性使其成为一种非常有价值的工具。

4.3 无锁数据结构

在传统的并发编程中,当多个线程需要访问共享数据时,我们通常会使用锁机制来确保数据的一致性和线程安全。就像多个线程在争夺一个共享的资源,锁就像是一把钥匙,只有拿到钥匙的线程才能进入“房间”(访问共享资源),其他线程只能在外面等待。这种方式虽然简单直接,但在高并发场景下,却会带来诸多问题。

无锁数据结构则打破了这种传统模式,它利用原子操作来实现并发控制。原子操作就像是一个个坚固的“堡垒”,在多线程环境中能够保证自身的操作是不可分割的,不会被其他线程干扰。无锁数据结构允许多个线程同时访问和修改数据结构,而不需要像传统锁机制那样进行等待。这就好比在一个没有交通信号灯(锁)的十字路口,车辆(线程)通过一套精心设计的规则(原子操作和算法)来协调通行,从而避免了因等待信号灯而造成的拥堵(锁竞争)。

在高度竞争的场景中,无锁数据结构的性能优势尤其突出。当多个线程同时争夺同一把锁时,传统的锁机制会导致很多线程处于停滞状态,等待锁的释放。这不仅增加了线程上下文切换的成本,还使 CPU 资源大量消耗在等待上。而无锁数据结构由于没有锁的约束,线程可以持续运行,不会因等待锁而被阻塞,从而显著提升了系统的吞吐量和反应速度。以一个多核处理器上运行的多线程服务器程序为例,采用无锁数据结构可以充分调动多核的并行计算能力,确保每个核心都能高效地处理线程任务,避免因锁竞争而导致的核心空闲。

(1)无锁栈
无锁栈是一种常见且实用的无锁数据结构,其实现策略巧妙地利用了原子操作。在无锁栈中,最重要的操作是入栈(push)和出栈(pop),这两个操作都依靠原子比较 / 交换(CAS - Compare And Swap)操作来确保线程的安全性。

下面是简化后的无锁栈的 C++ 实现示例:

template<typename T>
class lock_free_stack {
private:
? ? struct node {
? ? ? ? T data;
? ? ? ? node* next;
? ? ? ? node(const T& data) : data(data), next(nullptr) {}
? ? };

? ? std::atomic<node*> head; // 使用原子指针来指向栈顶

public:
? ? lock_free_stack() : head(nullptr) {}

? ? void push(const T& data) {
? ? ? ? node* new_node = new node(data);
? ? ? ? do {
? ? ? ? ? ? new_node->next = head.load(); // 保存当前栈顶指针
? ? ? ? } while (!head.compare_exchange_weak(new_node->next, new_node));?
? ? ? ? // 使用CAS操作尝试将新节点设置为栈顶,如果失败则重试
? ? }

? ? std::shared_ptr<T> pop() {
? ? ? ? node* old_head;
? ? ? ? do {
? ? ? ? ? ? old_head = head.load();
? ? ? ? ? ? if (!old_head) return nullptr; // 栈为空
? ? ? ? } while (!head.compare_exchange_weak(old_head, old_head->next));?
? ? ? ? // 使用CAS操作尝试将栈顶指针指向下一个节点,如果失败则重试
? ? ? ? std::shared_ptr<T> res = std::make_shared<T>(old_head->data);
? ? ? ? delete old_head;
? ? ? ? return res;
? ? }
};

在上述代码中,push操作首先创建一个新节点,然后通过compare_exchange_weak方法反复尝试将新节点设为栈顶。compare_exchange_weak方法会检查当前的head值(即预期值)是否与new_node->next(存储的当前栈顶指针)相等,如果相等,则将head更新为new_node(新值),表示入栈成功;如果不相等,则表明栈顶已被其他线程更改,compare_exchange_weak会将new_node->next更新为最新的head值,并返回false,随后再次进入循环重试。

pop操作同样使用compare_exchange_weak方法来尝试将栈顶指针移至下一个节点。如果成功,说明出栈操作完成,返回栈顶节点的数据;如果失败,说明栈顶已被其他线程更改,再次进入循环重试。

(2)无锁队列
无锁队列也是多线程环境中广泛使用的无锁数据结构,常用于实现生产者 - 消费者模型等场景。其工作原理同样基于原子操作,通过巧妙地管理队列的头指针和尾指针,实现高效的并发入队和出队操作。

以下是简化的无锁队列的 C++ 实现框架:

template<typename T>
class lock_free_queue {
private:
? ? struct node {
? ? ? ? std::shared_ptr<T> data;
? ? ? ? std::atomic<node*> next;
? ? ? ? node() : next(nullptr) {}
? ? };

? ? std::atomic<node*> head;
? ? std::atomic<node*> tail;

public:
? ? lock_free_queue() {
? ? ? ? node* dummy = new node();
? ? ? ? head.store(dummy);
? ? ? ? tail.store(dummy);
? ? }

? ? void push(const T& new_value) {
? ? ? ? std::shared_ptr<T> new_data = std::make_shared<T>(new_value);
? ? ? ? node* new_node = new node();
? ? ? ? new_node->data = new_data;
? ? ? ? node* old_tail;
? ? ? ? do {
? ? ? ? ? ? old_tail = tail.load();
? ? ? ? } while (!old_tail->next.compare_exchange_weak(nullptr, new_node));?
? ? ? ? // 尝试将新节点链入队列尾部
? ? ? ? tail.compare_exchange_strong(old_tail, new_node);?
? ? ? ? // 尝试更新tail指针
? ? }

? ? bool pop(T& result) {
? ? ? ? node* old_head = head.load();
? ? ? ? for (;;) {
? ? ? ? ? ? node* next = old_head->next.load();
? ? ? ? ? ? if (!next) return false; // 队列为空
? ? ? ? ? ? if (head.compare_exchange_weak(old_head, next)) {?
? ? ? ? ? ? // 尝试推进head指针
? ? ? ? ? ? ? ? result = std::move(*next->data);
? ? ? ? ? ? ? ? delete old_head;
? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? }
? ? ? ? }
? ? }
};

在此实现中,push操作首先创建一个新节点,并将新数据存入其中。接着,通过compare_exchange_weak方法尝试将新节点连接到队列的尾部(old_tail->next)。如果连接成功,再尝试使用compare_exchange_strong方法更新tail指针,使其指向新节点。即使更新tail指针失败,也不会影响队列的准确性,因为后续的push操作会协助推进tail指针。

pop操作则从队列头部开始,先获取当前的head指针,然后通过compare_exchange_weak方法尝试将head指针移动到下一个节点。如果移动成功,说明出队操作完成,提取并返回下一个节点的数据;如果移动失败,说明head指针已被其他线程更改,再次进入循环重试。

四、原子操作与非原子操作对比
为了更直观地体会原子操作的优势,我们通过具体实例和汇编指令来深入探讨原子变量和普通变量在多线程环境中的表现,了解原子操作如何在保证原子性和解决内存顺序问题上展现独特优势。

首先,我们看一个简单的计数器例子,对比普通变量和原子变量在多线程环境下的行为:

#include?<iostream>
#include?<thread>
#include?<atomic>

// 使用普通变量
int normalCounter = 0;
void normalIncrement() {
? ? for (int i = 0; i < 10000; ++i) {
? ? ? ? normalCounter++;
? ? }
}

// 使用原子变量
std::atomic<int> atomicCounter(0);
void atomicIncrement() {
? ? for (int i = 0; i < 10000; ++i) {
? ? ? ? atomicCounter++;
? ? }
}

int main() {
? ? std::thread t1Normal(normalIncrement);
? ? std::thread t2Normal(normalIncrement);

? ? t1Normal.join();
? ? t2Normal.join();
? ? std::cout << "Normal counter final value: " << normalCounter << std::endl;

? ? std::thread t1Atomic(atomicIncrement);
? ? std::thread t2Atomic(atomicIncrement);

? ? t1Atomic.join();
? ? t2Atomic.join();
? ? std::cout << "Atomic counter final value: " << atomicCounter << std::endl;

? ? return 0;
}

在此示例中,normalIncrement函数使用普通变量normalCounter进行递增操作,atomicIncrement函数则使用原子变量atomicCounter。当运行此代码时,会发现normalCounter的最终值通常小于 20000,这是因为normalCounter++这个操作不是原子的,它涉及读取、增加和写回三个步骤,在多线程环境下,这些步骤可能被其他线程中断,从而导致数据不一致。而atomicCounter作为原子变量,其递增操作是原子的,不会受到其他线程的干扰,因此最终atomicCounter的值必定是 20000。

接下来,我们通过查看汇编指令来深入了解原子变量和普通变量的底层区别。以atomicCounter.fetch_add(1)和normalCounter++为例,在 x86 架构下,使用 GCC 编译器编译后,它们的汇编指令大致如下:

; atomicCounter.fetch_add(1)的汇编指令示例
mov eax, 1
lock xadd dword ptr [atomicCounter], eax

; normalCounter++的汇编指令示例
mov eax, dword ptr [normalCounter]
add eax, 1
mov dword ptr [normalCounter], eax

从汇编指令可以观察到,atomicCounter.fetch_add(1)采用了lock xadd指令,lock前缀会在总线上加锁,在加锁期间,其他 CPU 核心无法通过总线访问内存,直至该指令完成,从而确保了操作的原子性。而normalCounter++则是常规的读取、加法和写入指令,没有提供任何原子性保障。如果在mov eax, dword ptr [normalCounter]和add eax, 1之间发生线程切换,另一个线程也执行相同的操作,可能会引发数据竞争和不一致。

再看一个涉及内存序的例子,假设我们有两个线程,一个线程负责写入数据和标志位,另一个线程负责读取数据和标志位:

#include?<iostream>
#include?<thread>
#include?<atomic>

std::atomic<int> data(0);
std::atomic<bool> flag(false);

// 写入线程
void writer() {
? ? data.store(42, std::memory_order_relaxed);
? ? flag.store(true, std::memory_order_relaxed);
}

// 读取线程
void reader() {
? ? while (!flag.load(std::memory_order_relaxed));
? ? std::cout << "Read data: " << data.load(std::memory_order_relaxed) << std::endl;
}

int main() {
? ? std::thread t1(writer);
? ? std::thread t2(reader);

? ? t1.join();
? ? t2.join();

? ? return 0;
}

在此例中,由于采用了memory_order_relaxed内存模型,编译器和处理器可以自由地对指令进行重排序。这可能导致writer线程中data.store(42)和flag.store(true)的执行顺序被重新排列,reader线程在读取数据时,可能会先读到flag为true,但此时data尚未被设置为42,从而读取到错误的数据。

如果我们把内存模型改为std::memory_order_release和std::memory_order_acquire:

// 写入线程
void writer() {
? ? data.store(42, std::memory_order_release);
? ? flag.store(true, std::memory_order_release);
}

// 读取线程
void reader() {
? ? while (!flag.load(std::memory_order_acquire));
? ? std::cout << "Read data: " << data.load(std::memory_order_acquire) << std::endl;
}

这样,writer线程在设置flag为true时,会确保之前对data的写入操作对其他线程可见;reader线程在读取flag为true之后,能够确保读取到最新的data值,从而确保了内存序的准确性。

通过上述案例和汇编指令的解析,我们可以清晰地看到,原子操作在多线程环境中能有效确保原子性和解决内存序问题,防止数据竞争和不一致的产生,为多线程编程提供了更为稳固的支持。

五、实战演练:构建无锁数据结构
5.1 选择合适的数据结构

在构建无锁数据结构时,选择适当的底层数据结构是至关重要的第一步,这好比为高层建筑打下坚实的基础。以无锁队列的实现为例,我们通常会选择单链表作为底层数据结构,这背后有多种考虑。

单链表具备灵活的内存分配特点,无需像数组那样在初始化时就分配连续的大块内存。在无锁队列中,随着元素的持续入队和出队,内存的动态分配和释放频繁发生。单链表可根据实际需求,每次在插入新元素时动态分配一个节点的内存空间,在删除元素时释放相应的节点内存,这种按需分配的方式显著提高了内存的使用效率,避免了数组可能遇到的内存浪费或不足的问题。

从链表的架构来看,单链表的节点通过指针依次连接,这种链式结构使插入和删除操作极为高效。对于无锁队列的入队操作,仅需在链表的末尾添加一个新节点;而出队操作则只需移除链表头部的节点。这两个操作的时间复杂度均为 O(1),相较于其他数据结构,如数组在插入和删除时可能需要移动大量元素,单链表在性能上展现出明显优势。

在多线程环境中,单链表的架构更有利于利用原子操作实现无锁算法。可以将节点中的指针定义为原子指针,这样在多个线程同时对链表进行操作时,通过原子操作可以确保指针的修改是原子性的,不会发生数据竞争和不一致的情况。例如,在入队操作中,通过原子比较/交换(CAS)操作来更新尾指针,确保新节点能准确地插入到链表末尾;在出队操作中,使用 CAS 操作更新头指针,确保从链表头部移除节点的操作是线程安全的。

5.2 实现无锁队列的关键操作

在无锁队列的实现中,入队(enqueue)和出队(dequeue)操作是其核心组成部分,它们的实现巧妙地运用了原子操作和一些特定的算法技巧。

(1)入队操作的实现步骤如下:

①准备新节点:首先创建一个新节点,用于存储即将入队的数据,并将其next指针初始化为nullptr。例如:

node* new_node = new node();
new_node->data = std::make_shared<T>(new_value);
new_node->next.store(nullptr);

②循环 CAS 链接新节点:通过一个循环,不断尝试使用 CAS 操作将新节点链接到当前尾节点的next指针上。在循环中,首先获取当前的尾节点old_tail,然后使用compare_exchange_weak方法尝试将old_tail->next从nullptr更新为new_node。如果 CAS 操作成功,表明新节点已成功链接到链表末尾,退出循环;若失败,则说明old_tail->next已被其他线程修改,此时需要更新old_tail为新的尾节点,再次尝试。代码如下:

node* old_tail;
do {
? ? old_tail = tail.load();
} while (!old_tail->next.compare_exchange_weak(nullptr, new_node));

③更新尾指针:在新节点成功链接后,尝试使用 CAS 操作更新尾指针,使其指向新节点。这里使用compare_exchange_strong方法,同样是为确保操作的原子性。即使更新尾指针失败,也不会影响队列的正确性,因为后续的入队操作会帮助推进尾指针。代码如下:

tail.compare_exchange_strong(old_tail, new_node);

(2)出队操作的实现步骤如下:

④获取当前头节点:首先获取当前的头节点old_head。由于头节点是一个哑元节点(dummy node),实际的数据存储在old_head->next指向的节点中。代码如下:

node* old_head = head.load();

⑤检查队列是否为空:如果old_head->next为nullptr,说明队列为空,直接返回false或nullptr。代码如下:

if (!old_head->next.load()) return false;

⑥循环 CAS 推进头指针:通过一个循环,不断尝试使用 CAS 操作将头指针推进到下一个节点。在循环中,首先获取old_head->next作为old_next,然后使用compare_exchange_weak方法尝试将head从old_head更新为old_next。如果 CAS 操作成功,说明出队操作成功,取出old_next中的数据并返回,同时删除old_head;如果失败,说明head已被其他线程修改,更新old_head为新的头节点,再次尝试。代码如下:

for (;;) {
? ? node* old_next = old_head->next.load();
? ? if (!old_next) return false;
? ? if (head.compare_exchange_weak(old_head, old_next)) {
? ? ? ? result = std::move(*old_next->data);
? ? ? ? delete old_head;
? ? ? ? return true;
? ? }
}

在无锁队列的实现中,还引入了“协助”机制。当一个线程在进行入队或出队操作时,如果发现其他线程的操作导致数据结构处于不一致的状态(例如,入队时发现尾指针没有及时更新),它会协助其他线程完成操作,从而确保整个队列的正常运行。这种“协助”机制是无锁数据结构能够在多线程环境下正确工作的关键保障之一。

5.3性能测试与分析

为了评估无锁队列的性能优势,我们使用 Google Benchmark 这个强大的性能测试工具进行性能测试。Google Benchmark 提供了简洁易用的接口,可以方便地对各种函数和数据结构进行性能测试,并且能够生成详细的测试报告,帮助我们准确地了解不同实现的性能表现。

在测试中,我们将无锁队列与传统的锁保护队列(使用std::mutex保护的std::queue)进行对比。测试场景设置为多线程环境,分别测试在不同线程数量(如 2 个线程、4 个线程、8 个线程等)下,两种队列的入队和出队操作的性能表现。测试代码如下:

#include?<benchmark/benchmark.h>
#include?"lock_free_queue.h" // 假设无锁队列的头文件
#include?<queue>
#include?<mutex>

// 定义锁保护队列
class mutex_protected_queue {
private:
? ? std::queue<int> q;
? ? std::mutex mtx;
public:
? ? void push(int value) {
? ? ? ? std::lock_guard<std::mutex> guard(mtx);
? ? ? ? q.push(value);
? ? }
? ? bool pop(int& result) {
? ? ? ? std::lock_guard<std::mutex> guard(mtx);
? ? ? ? if (q.empty()) return false;
? ? ? ? result = q.front();
? ? ? ? q.pop();
? ? ? ? return true;
? ? }
};

// 无锁队列的性能测试函数
static void BM_LockFreeQueue(benchmark::State& state) {
? ? lock_free_queue<int> q;
? ? for (auto _ : state) {
? ? ? ? q.push(42);
? ? ? ? int result;
? ? ? ? q.pop(result);
? ? }
}
BENCHMARK(BM_LockFreeQueue)->Threads(2)->Threads(4)->Threads(8);

// 锁保护队列的性能测试函数
static void BM_MutexProtectedQueue(benchmark::State& state) {
? ? mutex_protected_queue q;
? ? for (auto _ : state) {
? ? ? ? q.push(42);
? ? ? ? int result;
? ? ? ? q.pop(result);
? ? }
}
BENCHMARK(BM_MutexProtectedQueue)->Threads(2)->Threads(4)->Threads(8);

BENCHMARK_MAIN();

测试结果表明,在低并发场景下,由于无锁队列存在一定的“忙等”开销,即线程在 CAS 操作失败时会不断重试,而锁保护队列在获取锁成功后可以直接进行操作,所以两者的性能差距并不明显,有时锁保护队列甚至表现得更快一些。但随着线程数量的增加,进入高并发场景后,锁保护队列的性能急剧下降。

这是因为大量线程同时竞争同一把锁,导致许多线程处于阻塞状态,等待锁的释放,从而增加了线程上下文切换的开销,使得 CPU 资源大量浪费在等待上。而无锁队列由于没有锁的限制,线程可以持续执行,通过原子操作和“协助”机制来协调对共享资源的访问,避免了线程阻塞,其性能优势得以充分体现,吞吐量明显高于锁保护队列,能够更高效地处理多线程并发访问的情况。

二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

栏目导航
热门文章
推荐文章

说点什么

分享

扫码加好友,拉您进群
各岗位、行业、专业交流群