基于锁的并发数据结构

互斥锁提供了互斥:一次只能有一个线程获得互斥锁。互斥锁保护数据结构是通过显式地阻止对它所保护数据的真正并发访问来实现的。这称为串行化:线程轮流访问被互斥锁保护的数据。它们必须串行而非并发的访问它。

因此,必须仔细考虑数据结构的设计,使得能够真正的并发访问。虽然有些数据结构比其他数据结构具有更大的并发范围,但在所有情况下,其思想都是相同的:受保护的区域越小,串行化的操作就越少,并发的潜力也就越大。

设计并发数据结构的指南

当设计并发访问的数据结构时,需要考虑两个方面:确保访问安全以及允许真正的并发访问。

(一)如何确保访问安全?

  • 确保没有线程能够看到数据结构的不变量被另一个线程破坏的状态。
  • 通过提供完整操作的函数,而非一个个操作步骤的函数来小心避免接口固有的竞争条件。
  • 注意数据结构在有异常时的行为,从而确保不变量不会被破坏。
  • 通过限制锁的范围以及避免嵌套锁,将死锁的概率降到最低。

(二)如何允许真正的并发访问?

  • 是否可以限制锁的作用范围,以允许操作的某些部分在锁外执行?
  • 数据结构不同部分能否被不同的互斥锁保护?
  • 所有的操作需要同一级别的保护吗?
  • 是否可以对数据结构进行简单的修改,以增加并发访问的机会,并且不影响操作语义?

基于锁的并发数据结构

使用锁的线程安全栈

有锁安全栈的初版

代码地址:有锁安全栈的初版

如你所见,基本的线程安全是通过使用互斥锁 m 上的锁保护每个成员函数提供的。这将确保在任何时候只有一个线程在访问数据,因此只要每个成员函数保持不变量,就没有线程能看到被破坏的不变量。

在 empty() 和 pop() 成员函数之间有潜在的竞争条件。即在调用 empty() 方法和 pop() 方法之间有其他线程同时修改栈的状态(例如,将一个元素弹出),这可能导致 empty() 方法返回 false,而随后调用的 pop() 方法却发现栈已经为空,从而导致未定义行为。不过我们的代码会在 pop() 函数持有锁的时候,显式的查询栈是否为空,所以这里的竞争条件没有问题。这意味着即使有其他线程尝试访问栈,只有在 pop() 方法完成时,锁才会释放,因此不会出现问题。

栈中也有一些潜在抛异常的地方。对互斥锁上锁可能会抛出异常,但这种情况不仅极其罕见的(这意味着互斥锁有问题,或者缺乏系统资源),而且它是每个成员函数的第一个操作。由于没有数据被修改,所以是安全的。解锁互斥锁不会失败,所以总是安全的,并且使用 std::lock_guard<> 确保了互斥锁不会一直处于上锁的状态。

对 m_stack.push() 的调用可能会抛出一个异常,只要拷贝/移动数据值抛出一个异常,或者可分配的内存不足。不管是哪种情况,std::stack<> 都能保证是安全的,所以也没有问题。

empty() 不会修改任何数据,所以也是异常安全的。

改进建议如下:

  1. 无需单独维护一个 m_size 变量,作者提供的代码中根据 stack 原生的接口 empty 和 size 方法实现 安全判空和 获取栈大小。即 单独维护一个 m_size 变量实属多余,应当删除以及相关代码。
  2. 作者提供的代码中提供有异常代码,用于在 pop 栈为空的情况下抛出异常,也就是我们的线程安全栈需要内部对这个问题进行异常检测,这也是该接口的责任。只不过,我们不在内部处理异常,而是选择抛出这个异常,由上一层函数去处理或继续抛出(main 无论如何都得处理,不能继续抛出了)。
  3. std::move 用以转移对象到另一个对象,可以节省内存资源。push 应该支持 把传递的对象 std::move 进来,这同样就要求添加对象内部要自实现 移动构造函数和移动赋值运算符。
  4. 我们的 pop 和 STL 中的有些不同,即是有返回值,返回弹出的对象。既然是要删除它,其生命周期需要被管理,用智能指针管理即可。
  5. 作者还额外提供一种 pop 接口,即传递一个引用参数,这个参数用以存储 top 元素,但是该接口没有返回值,这边也添加该接口。

有锁安全栈的第一次优化

代码地址:有锁安全栈的第一次优化

通过将弹出的数据作为 pop() 方法的一部分,避免了先调用 top() 方法再调用 pop() 方法之间的潜在竞争条件。这样,pop() 方法在确保栈不为空的情况下直接弹出元素,降低了出错的风险。这种设计方式是尽可能减少提供给外界的接口,从而避免可能存在的竞态条件。但明显,top 语义确实被删除了,因为 top 本意是获取栈顶元素,而不包含删除的含义,但我们这里合并之后,如果你要获取栈顶元素,你同时还表明要移除栈顶元素。最新的优化已移除 top 接口。

在第一个重载的 pop() 中,代码本身可能会抛出一个 empty_stack 的异常,但由于什么都没有修改,所以是安全的。创建 result 可能会抛出一个异常,有几个方面的原因:对 std::make_shared 的调用,可能因为无法为新对象以及引用计数需要的内部数据分配出足够的内存而抛出异常;或者在拷贝/移动到新分配内存的时候,返回的数据项的拷贝构造或移动构造函数可能抛出异常。两种情况下,C++运行库和标准库会确保没有内存泄露,并且新创建的对象(如果有的话)会被正确的销毁。因为仍然没有对栈进行任何修改,所以也不会有问题。调用 m_stack.pop() 保证不会抛出异常,随后是返回结果,所以这个重载的 pop() 函数是异常安全的。

第二个重载的 pop() 类似,不过这次是在拷贝赋值或移动赋值时可能抛出异常,而不是在构造新对象和 std::shared_ptr 实例时。再次,直到调用 m_stack.pop()(pop 仍然保证不会抛出异常)前,没有修改数据结构,所以这个函数也是异常安全的。

这里有几个可能导致死锁的机会,因为你在持有锁的时候调用了用户代码:数据项上的拷贝构造或移动构造和拷贝赋值或移动赋值操作,也可能是用户自定义的 new 操作符。如果这些函数或者调用了栈上的成员函数(而栈正在插入或移除数据项),或者需要任何类型的锁,而在调用栈成员函数时又持有了另一把锁,那么就有可能出现死锁。但明智的做法是要求栈的用户负责确保这一点;你不能期望在不拷贝或不为它分配内存的情况下将数据项添加到栈或从栈中删除。

总结

由于所有成员函数都使用 std::lock_guard<> 保护数据,所以不管多少线程调用栈成员函数都是安全的。唯一不安全的成员函数是构造函数和析构函数,但这不是问题;对象只能被构造一次,也只能被销毁一次。不管并发与否,调用一个不完全构造的对象或是部分销毁的对象的成员函数永远都不可取。因此,用户必须确保其他线程直到栈完全构造才能访问它,并且必须确保在栈对象销毁前,所有线程都已经停止访问栈。

尽管多个线程并发调用成员函数是安全的,但由于使用了锁,每次只有一个线程在栈数据结构中做一些工作。线程的串行化会潜在的限制应用程序的性能,因为在栈上有严重的争用:当一个线程在等待锁时,它没有做任何有用的工作。同样,栈也没有提供什么方法等待添加一个数据项,所以如果线程需要等待时,它必须周期性地调用 empty() 或pop(),并且捕获 empty_stack 异常。

如果这种场景是必须的,那这种栈实现就是个糟糕的选择,因为等待线程要么消耗宝贵的资源去检查数据,要么要求用户编写外部等待和通知的代码(例如,使用条件变量),这就使内部上锁没有必要,因而造成浪费。接下来的队列展示了一种使用数据结构内部的条件变量将这种等待合并到数据结构本身的方法,接下来我们看一下这个。

使用锁和条件变量的安全队列

有锁和条件变量组合的安全队列初版

代码地址:有锁和条件变量组合的安全队列初版

有之前线程安全栈的代码经验,部分代码无需重复讲解,这里重点谈一谈条件变量的引入会给我们的代码带来怎样的变化?这里关注围绕 pop 方法定义的接口:try_pop 和 wait_pop,前者就是实现安全栈的两个 pop 方法,几乎区别,只是有个方法返回值为 bool。我们重点关注对象是 wait_pop。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
template<typename T>
void SafeQueue<T>::wait_pop(T &data) {
std::unique_lock<std::mutex> ul(m_mtx);
m_have.wait(ul,[this](){
return !m_queue.empty();
}); // m_queue have data
data = std::move(m_queue.front());
m_queue.pop();
}
template<typename T>
std::shared_ptr<T> SafeQueue<T>::wait_pop() {
std::unique_lock<std::mutex> ul(m_mtx);
m_have.wait(ul,[this](){
return !m_queue.empty();
}); // m_queue have data
std::shared_ptr<T> result(std::make_shared<T>(std::move(m_queue.front())));
m_queue.pop();
return result;
}

在之前的栈中,如果没有元素就会抛出异常,这实在是不恰当的做法,面对这种情况,我们还需要处理这个异常,我想不会因为栈中没有数据就不去获取吧?毕竟这可能是暂时的,往往我们就得去循环判断 empty 情况。

新的 wait_pop() 函数解决了在栈中碰到的等待队列条目的问题;比起持续调用 empty(),等待线程调用 wait_pop() 函数并且数据结构使用条件变量来处理等待。对 m_have.wait() 的调用,直到队列中至少有一个元素时才会返回,所以不用担心会出现空队列的情况,并且数据仍然被互斥锁保护。因此,这些函数不会添加任何新的竞争条件或死锁的可能性,并且将支持不变量。

有锁和条件变量组合的安全队列进行第一次优化

代码地址:有锁和条件变量组合的安全队列进行第一次优化

前面我们采用的是notify_one的方式,所以仅有一个线程被激活,如果被激活的线程异常了,就不能保证该数据被其他线程消费了,解决这个问题,可以采用几个方案:

  1. wai_pop失败的线程修复后再次取一次数据。
  2. notify_one改为notify_all,这样能保证通知所有线程。但是notify_all将导致所有线程竞争,并不可取。
  3. 我们可以通过队列存储智能指针的方式进行,因为智能指针在赋值的时候不会引发异常。

经过考量,作者采用第三种方式来进行优化。

从此,我们的队列中存储的所有元素全部都是 智能指针管理的对象。

1
std::queue<std::shared_ptr<T>> m_queue;

我们在 push 的时候先构造智能指针,如果构造的过程失败了也就不会 push 到队列中,不会污染队列中的数据。再者,之前是把构造智能指针对象放在加锁之后,但这是没有必要的,应该放在加锁之前。

你可能会问难道没有并发安全问题?当然没有,我们只需要在放入队列之前加锁即可,但是构造对象无需加锁,因为每次调用 push 都是在第一个独立的栈中,共享的资源是队列,我们只要管好队列的添加和删除元素时候的并发安全问题就可以了。

1
2
3
4
5
6
7
template<typename T>
void SafeQueue<T>::push(T data) {
std::shared_ptr<T> new_data(std::make_shared<T>(std::move(data)));
std::lock_guard<std::mutex> lg(m_mtx);
m_queue.push(std::move(new_data));
m_have.notify_one();
}

还有,我们队列中既然存储的是智能指针,那么我们 pop系列 的接口本身也是要返回 pop 的对象的,我们需要通过 * 获取智能指针管理的对象本身。就像下面这样:

1
data = std::move(*m_queue.front());

但是我们分析上面的代码,队列 push 和 pop 时采用的是一个mutex,导致 push 和 pop 等操作串行化(pop 和 push 不能独立执行),我们要考虑的是优化锁的精度,提高并发。

有细粒度锁和条件变量组合的安全队列进行第二次优化

部分限制来源于 STL 提供的 std::queue 容器,如果我们自实现队列,那么可以掌控其中的细节,从而提供更细粒度的锁从而实现更高级别的开发。

队列可以基于数组进行开发,也可以基于单链表进行开发,这边选择单链表。

队列1.png

数据项从队列的另一端添加到队列。为了做到这点,队列还有一个 tail 指针,它指向链表中的最后一项。新节点的添加是通过改变最后一项的 next 指针,让它指向新的节点,然后更新 tail 指针指向这个新的数据项。当链表为空时,头/尾指针都为 NULL。

代码地址:单线程基于链表实现的简单队列

使用了 std::unique_ptr<node> 来管理节点,因为这能保证当不再需要它们的时候,它们(以及它们引用的数据)会自动删除,而不必使用显式的 delete。这个所有权链的管理从 head 开始,tail 是指向最后一个节点的裸指针,因为它需要引用 std::unique_ptr<node> 已经拥有的节点。

这个队列在单线程下可以正常工作,但是多线程是万万不可的,有几个事情会带来麻烦。

我们有两个成员变量 head 和 tail,原则上可以使用两个锁分别保护头和尾指针,但这样做有几个问题。

最明显的问题就是 push() 可能同时修改 head 和 tail,所以它必须锁住两个互斥锁。尽管很不幸,但这倒不算是太大的问题,因为锁住两个互斥锁是可能的。关键的问题是 push() 和 pop() 都能访问 next 指针指向的节点:push() 更新 tail->next,然后 try_pop() 读取 head->next。如果队列中只有一个元素,那么 head == tail,所以 head->next 和 tail->next 是同一个对象,并且这个对象需要保护。由于不同时读取 head 和 tail 的话,没法区分它们是否是同一个对象,你现在必须在 push() 和 try_pop() 中锁住同一个锁,所以,也没比以前好多少。

那有什么办法摆脱这个困境吗 ? 通过分离数据使并发成为可能。

你可以通过预先分配一个没有数据的虚拟节点来解决这个问题,以确保队列中至少有一个节点将头部访问的节点与尾部访问的节点分开。对一个空队列,head 和 tail 都指向虚拟节点,而非 NULL 指针。这很好,因为如果队列为空,try_pop() 就不会访问 head->next 了。如果添加一个节点到队列中(所以有一个真实节点),那 head 和 tail 现在会指向不同的节点,所以在 head->next 和 tail->next 上不会有竞争。但缺点是,为了允许虚拟节点的存在,需要增加额外的一层间接性——通过指针来存储数据。

代码地址:分离数据

尽管我们还是基于单线程,还没有并发安全的考虑,但本次代码改动还是很大的,很多值得聊的地方。

1
2
3
4
struct Node{  // 定义节点
std::shared_ptr<T> data;
std::unique_ptr<struct Node> next;
};

最初 data 变量 没有采用智能指针管理,这边补充。

通过引入虚拟节点,我们的 push 仅访问 tail 指针,而无需访问 head 指针。try_pop 需要访问 tail 和 head 指针,但 head 只需要在 与 tail 进行比较的时候需要用到,往后不需要。重大的收益在于虚拟节点的引入,使得 try_pop() 和 push() 永远不用对同一节点进行操作,所以就不再需要一个总的互斥锁了。你可以给 head 和 tail 分别分配一个互斥锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template<typename T>
void StrongestSafeQueue<T>::push(T data) {
std::shared_ptr<T> new_data = std::make_shared<T>(std::move(data)); // 构造 data
std::unique_ptr<Node> new_node(std::make_unique<Node>()); // 构造 Node
tail->data = new_data;
Node* const new_tail = new_node.get();
tail->next = std::move(new_node);
tail = new_tail;
}

template<typename T>
std::shared_ptr<T> StrongestSafeQueue<T>::try_pop() {
if (head.get() == tail){ // 只有虚拟节点,代表队列为空
return std::shared_ptr<T>();
}
std::shared_ptr<T> const result(head->data); // data 成员已经是 shared_ptr 指针了
std::unique_ptr<Node> const old_head = std::move(head);
head = std::move(old_head->next); // 更新头指针
return result;
}

下面通过示意图来解释核心流程:

队列2.png

我们创建 StrongestSafeQueue<T> 对象,就会在构造函数中默认初始化得到一个虚拟节点,这个节点不包含任何有效数据。

添加一个节点,但是这个节点并不是包含有效数据,而是像之前的虚拟节点那样,添加到有效数据放在虚拟节点中(这里现在是这样),那么等到下一次存储有效数据的时候,就是当前创建的这个节点用来存储了:

队列3.png

这就是为什么等我们 try_pop 数据的时候,直接从 head 开始,然后记得更新 head 指向下一个节点即可。

最初还没有弄清楚作者的代码逻辑,我想虚拟头结点不是没有有效数据吗?为什么去访问数据呢?因为我想着是通过虚拟头结点的下一个节点去存储有效数据的,导致出现误会了,绘图之后才算弄清楚了。只要有一个有效节点,那么 tail 和 head 始终不会访问同一个节点,head 指向有效的节点;tail 指向的节点不包含有效数据,但是是用来存储下一个有效数据。

代码地址:开始加锁

push() 很简单,访问 tail 期间都需要互斥锁被锁住,也就是在新分配节点后,对当前尾节点进行赋值之前上锁。锁需要持有到函数结束。

try_pop() 就没那么简单了。首先,需要使用互斥锁锁住 head,一直到不再访问head。这个互斥锁决定了哪一个线程进行弹出操作,所以你会第一时间锁上它。一旦 head 改变,就可以解锁互斥锁;在返回结果时,互斥锁就不需要上锁了。这使得对 tail 的访问需要锁住 tail 互斥锁。因为只需要访问 tail 一次,且只有在读取时才需要互斥锁。所以最好的做法是把它包装到函数中。实际上,因为需要 head 互斥锁被锁住的代码只是成员的一个子集,把它包装到函数中也会更清晰。

因为没有修改任何接口,所以没有新的外部机会导致死锁。在实现内部也不会有死锁;唯一需要获取两个锁的是 pop_head(),这个函数都是先获取 head_mutex 然后 tail_mutex,所以永远不会死锁。

代码地址:等待数据项弹出

把之前的 wait_pop 接口给实现,即引入条件变量。

设计更复杂的基于锁的数据结构

使用锁实现的线程安全的查找表

代码地址:使用锁实现的线程安全的查找表

我们实现的查找表是链地址法,见下:

链地址法.png

就是把 桶中的节点,即链表进行加锁,来保证互斥安全。我们的查找表可以支持并发读,并发写,并发读的时候不会阻塞其他线程。但是并发写的时候会卡住其他线程。基本的并发读写没有问题。

但是对于bucket_type中链表的操作加锁精度并不精细,因为我们采用的是std提供的list容器,所以增删改查等操作都要加同一把锁,导致锁过于粗糙。

使用锁实现的线程安全的链表

代码地址:使用锁实现的线程安全的链表

添加一个对象到链表–头插法

1
2
3
4
5
6
7
template <typename T>
void push_front(T data) {
std: :unique_ptr < Node > new_node = std: :make_unique < Node > (data);
std: :lock_guard < std: :mutex > lg(head.mtx);
new_node - >next = std: :move(head.next);
head.next = std: :move(new_node);
}

准备插入节点之前:

链表1.png

我们锁住虚拟头结点,那么对该节点成员的访问就是线程安全的。因此,让新节点的 next 指向虚拟头结点 next 指向的节点,这是线程安全的。

链表2.png

新节点就插入成功,但还需要更新虚拟头结点,来确保这是一个合理的单链表。由于虚拟头结点依旧还在锁中,不会被外人访问到,而其它接口本身也只能从访问 head 开始,不用担心线程安全问题。

我们可以正常且安全更新 head 的 next 指针,即指向新节点,因为 head 的 next 指针可以安全访问,而后续节点访问必须从 head 开始。

链表3.png

从链表中删除满足某个条件的项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
template<typename Predicate>
void remove_if(Predicate P){
Node* current = &head;
std::unique_lock<std::mutex> ul(head.mtx);
while (Node* const next = current->next.get()) {
std::unique_lock<std::mutex> ul_next(next->mtx);
if (P(*next->data)) { // 满足删除条件
std::unique_ptr<Node> old_next = std::move(current->next);
current->next = std::move(next->next);
ul_next.unlock();
} else {
ul.unlock();
current = next;
ul = std::move(ul_next); // 保证 current 的访问是线程安全,因为之前的 ul 已经被 unlock
}
}
}

未加锁之前,我们先存储当前虚拟头结点为 current。这可能存在一个问题,就是最新的虚拟头结点可能会被更换,导致 current 并不一定存储的最新的虚拟头结点?

非也,虚拟头结点本质是一个指针,并不会被更换。回顾之前的插入节点,我们只是先把新节点指向 虚拟头结点的下一个节点,再去把 头结点指向这个新节点,从头到尾是线程安全,并且本质上没有更改虚拟头结点本身,所以不存在上述问题。

要访问 head->next 或者 current->next,需要对 head 加锁。需要访问 next->next ,所以要对 next 加锁。至此,我们已经加两把锁了。

链表4.png

满足删除条件

需要安全访问两个节点,暂时不解开任何一把锁,等安全删除 next 节点之后才考虑解锁。先把 current - >next 节点用智能指针管理,待离开作用域自行删除。

接下来就是通过更改 current 的 next 指向来安全删除满足条件的节点 next,随后就可以解开 next 的锁了。

1
2
3
std: :unique_ptr < Node > old_next = std: :move(current - >next);
current - >next = std: :move(next - >next);
ul_next.unlock();

不满足删除条件

不需要删除任何节点,只需要移动 current / head 指针到 next 节点上即可。由于 current / head 指针已经不需要访问,解锁即可。让 current 指向 下一个节点,即 next。在下一次的循环里会访问 current,为了安全性,我们需要把锁给到 ul,这样它就保证 current 访问的安全性,这点赋值锁是相当巧妙的,因为在最外层就是一把锁,这个锁通过这种方式,就已经锁住 current 了。

1
2
3
ul.unlock();
current = next;
ul = std::move(ul_next); // 保证 current 的访问是线程安全,因为之前的 ul 已经被 unlock

更新链表中满足某个条件的项

1
2
3
4
5
6
7
8
9
10
11
12
template < typename Function > void for_each(Function F) {
Node * current = &head;
std: :unique_lock < std: :mutex > ul(head.mtx); // 方便随时解锁
while (Node * const next = current - >next.get()) {
// 要操作 current 下一个 节点,自然要 加其锁
std: :unique_lock < std: :mutex > ul_next(next - >mtx);
ul.unlock(); // 已经可以安全访问到 current 下一个节点
F( * next - >data);
current = next;
ul = std: :move(ul_next); // 保证 current 的访问是线程安全,因为之前的 ul 已经被 unlock
}
}

如果明白上面更新锁的操作,这个地方就很好理解。

但我们能够安全访问到 next 节点,就可以安心释放 ul 锁,因为我们无需访问 current 节点。等到更新 next 节点数据之后,就交换锁,把 ul_next 锁 换到 ul 锁,回到循环开始访问 current 的时候,就是已经加锁状态,可以继续安全访问 current 节点了。

从链表中查找满足某个条件的项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
template < typename Predicate > std: :shared_ptr < T > find_first_if(Predicate P) {
Node * current = &head;
std: :unique_lock < std: :mutex > ul(head.mtx);
while (Node * const next = current - >next.get()) {
std: :unique_lock < std: :mutex > ul_next(next - >mtx);
ul.unlock();
if (P( * next - >data)) {
return next - >data;
}
current = next;
ul = std: :move(ul_next); // 保证 current 的访问是线程安全,因为之前的 ul 已经被 unlock
}
return std: :shared_ptr < T > ();
}