Contents
基于锁的并发数据结构的设计¶
设计并发数据结构的建议¶
设计并发数据结构要考虑两点,一是确保访问thread-safe,二是允许真正的并发访问。thread-safe 基本要求如下
- 数据结构的不变量被一个线程破坏时,确保不被线程看到此状态。
- 提供操作完整的函数来避免数据结构接口中固有的 race condition。
- 注意数据结构出现异常时的行为,以确保不变量不被破坏。
- 限制锁的范围,避免可能的嵌套锁,最小化死锁的概率。
关于真正的并发访问没有太多建议,但作为数据结构的设计者需要考虑以下问题
- 部分操作是否可以在锁的范围外执行。
- 数据结构的不同部分是否被不同的 mutex 保护。
- 是否所有操作需要同级别的保护。
- 在不影响操作语义的前提下,能否对数据结构做一个简单的修改提高并发访问的概率。
这些问题可以总结为一点,即如何最小化线程对被 mutex 保护的数据的轮流访问,以及最大化真实的并发量。
使用锁实现thread-safe stack¶
struct emptyStack : std::exception
{
const char* what() const noexcept
{
return "empty stack!";
}
};
template<typename T>
class A {
std::stack<T> s;
mutable std::mutex m;
public:
A() : s(std::stack<T>()) {}
A(const A& rhs)
{
std::lock_guard<std::mutex> l(rhs.m);
s = rhs.s;
}
A& operator=(const A&) = delete;
void push(T n)
{
std::lock_guard<std::mutex> l(m);
s.push(std::move(n));
}
std::shared_ptr<T> pop() //@ 返回一个指向栈顶元素的指针
{
std::lock_guard<std::mutex> l(m);
if (s.empty()) throw emptyStack();
const std::shared_ptr<T> res(std::make_shared<T>(std::move(s.top())));
s.pop();
return res;
}
void pop(T& n) //@ 传引用获取结果
{
std::lock_guard<std::mutex> l(m);
if (s.empty()) throw emptyStack();
n = std::move(s.top());
s.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> l(m);
return s.empty();
}
};
使用锁和条件变量实现thread-safe queue¶
template<typename T>
class A {
mutable std::mutex m;
std::queue<T> q;
std::condition_variable cv;
public:
A() {}
A(const A& rhs)
{
std::lock_guard<std::mutex> l(rhs.m);
q = rhs.q;
}
void push(T x)
{
std::lock_guard<std::mutex> l(m);
q.push(std::move(x));
cv.notify_one();
}
void wait_and_pop(T& x)
{
std::unique_lock<std::mutex> l(m);
//@ 在queue中有元素时才返回,而不必持续调用empty
cv.wait(l, [this] { return !q.empty(); });
x = std::move(q.front());
q.pop();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> l(m);
cv.wait(l, [this] { return !q.empty(); });
std::shared_ptr<T> res(std::make_shared<T>(std::move(q.front())));
q.pop();
return res;
}
bool try_pop(T& x)
{
std::lock_guard<std::mutex> l(m);
if (q.empty()) return false;
x = std::move(q.front());
q.pop();
return true;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> l(m);
if (q.empty()) return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(std::move(q.front())));
q.pop();
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> l(m);
//@ 其他线程可能有此对象(拷贝构造)所以要上锁
return q.empty();
}
};
这引入了一个异常安全问题,多个线程处于等待时,notify_one 只会唤醒一个线程,如果这个线程在wait_and_pop中(比如构造 std::shared_ptr 对象时就可能)抛出异常,其余线程将永远不会唤醒。
- 将
notify_one
改为
notify_all,这样就会唤醒所有线程,代价是大部分线程发现
queue仍为empty时,又会继续休眠。 - 抛出异常时让
wait_and_pop调用 notify_one,从而唤醒另一个线程。 - 将
std::shared_ptr
的初始化移到
push中,并且内部的 std::queue 不直接存储值,而是存储管理值的std::shared_ptr 。
#include <memory>
#include <mutex>
#include <condition_variable>
#include <queue>
template<typename T>
class A {
mutable std::mutex m;
std::queue<std::shared_ptr<T>> q; //@ 之前为std::queue<T> q
std::condition_variable cv;
public:
A() {}
A(const A& rhs)
{
std::lock_guard<std::mutex> l(rhs.m);
q = rhs.q;
}
void push(T x)
{
std::shared_ptr<T> data(std::make_shared<T>(std::move(x)));
//@ 上面的构造在锁外完成,之前只能在pop中且持有锁时完成
//@ 内存分配操作开销很大,这种做法减少了mutex的持有时间,提升了性能
std::lock_guard<std::mutex> l(m);
q.push(data); //@ 之前为q.push(std::move(x))
cv.notify_one();
}
void wait_and_pop(T& x)
{
std::unique_lock<std::mutex> l(m);
cv.wait(l, [this] { return !q.empty(); });
x = std::move(*q.front()); //@ 之前为std::move(q.front())
q.pop();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> l(m);
cv.wait(l, [this] { return !q.empty(); });
std::shared_ptr<T> res = q.front();
//@ 之前为std::make_shared<T>(std::move(q.front()))
//@ 现在构造转移到了push中
q.pop();
return res;
}
bool try_pop(T& x)
{
std::lock_guard<std::mutex> l(m);
if (q.empty()) return false;
x = std::move(*q.front()); //@ 之前为std::move(q.front())
q.pop();
return true;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> l(m);
if (q.empty()) return std::shared_ptr<T>();
std::shared_ptr<T> res = q.front();
//@ 之前为std::make_shared<T>(std::move(q.front()))
q.pop();
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> l(m);
return q.empty();
}
};
使用细粒度锁和条件变量实现thread-safe queue¶
和 thread-safe stack 一样,使用 mutex
保护了整个数据结构,但限制了对并发的支持。多线程在各种成员函数中被阻塞,而只有一个线程能在同一时间做任何事。不过这种限制主要是因为内部实现使用的是
std::queue,为了支持更高级别的并发就需要提供更细粒度的锁,为了提供更细粒度的锁就要换一种实现方式。
最简单的 queue 实现方式是包含头尾指针的单链表
template<typename T>
class A {
struct node {
T val;
std::unique_ptr<node> next;
node(T x) : val(std::move(x)) {}
};
std::unique_ptr<node> head;
node* tail;
public:
A() {}
A(const A&) = delete;
A& operator=(const A&) = delete;
void push(T x)
{
std::unique_ptr<node> p(new node(std::move(x))); //@ 新建一个值为x的节点
node* const oldHead = p.get(); //@ 获取新节点的原始指针
if (tail)
{ //@ 2:如果尾节点不为空则next设为新节点
tail->next = std::move(p);
}
else
{ //@ 如果尾节点为空(说明无元素)则头节点设为新节点
head = std::move(p);
}
tail = oldHead; //@ tail设为新节点的原始指针
}
std::shared_ptr<T> try_pop()
{
if (!head)
return std::shared_ptr<T>(); //@ 头节点为空则返回空指针
std::shared_ptr<T> res(std::make_shared<T>(std::move(head->val))); //@ 保存头节点值
std::unique_ptr<node> oldHead = std::move(head); //@ 获取头节点
head = std::move(oldHead->next); //@ 1:头节点指向下一个节点
return res; //@ 返回之前保存的头节点的值
}
};
这是一个单线程下没问题的
queue,但在多线程下就有明显问题,即使用两个 mutex
分别保护头尾指针。push 可以同时修改头尾指针,会对两个 mutex
上锁,更严重的是 push 和 try_pop 都能访问 next
节点(见注释1和2处),仅有一个元素时头尾指针相等,两处的 next
也是同一对象,于是 try_pop 读 next 与 push 写 next
就产生了竞争,锁的也是同一个 mutex。
这个问题可以通过在 queue 的最后预设一个 dummy node 解决:
template<typename T>
class A {
struct node {
std::shared_ptr<T> val; //@ 之前为T val
std::unique_ptr<node> next;
};
std::unique_ptr<node> head;
node* tail;
public:
A() : head(new node), tail(head.get()) {} //@ 预设头尾节点指向一处(未存储任何值)
A(const A&) = delete;
A& operator=(const A&) = delete;
void push(T x)
{ //@ 现在push只访问tail而不访问head,意味着不再会与try_pop操作同一节点而争夺锁
std::shared_ptr<T> newVal(std::make_shared<T>(std::move(x))); //@ 新建一个保存新值的指针
std::unique_ptr<node> p(new node); //@ 新建一个不存储值的新节点(用作新的尾节点)
node* const newTail = p.get(); //@ 获取新节点的原始指针
tail->val = newVal; //@ 当前尾节点的值设为新值(原本不存储值)
tail->next = std::move(p); //@ tail->next设为新节点
tail = newTail; //@ 最后令tail指向新节点
}
std::shared_ptr<T> try_pop()
{ //@ 同时访问head和tail只在最初的比较上,锁是短暂的
//@ 之前的判断条件为if (!head),现在为头节点与尾节点指向一处
if (head.get() == tail)
return std::shared_ptr<T>(); //@ 返回空指针
std::shared_ptr<T> res(head->val); //@ 保存头节点的值,现在为std::shared_ptr类型
std::unique_ptr<node> oldHead = std::move(head); //@ 获取旧的头节点
head = std::move(oldHead->next); //@ 头节点指向下一个节点
return res; //@ 返回之前保存的头节点的值
}
};
接着加上锁,锁的范围应该尽可能小。对于 push 来说很简单,对 tail
的访问上锁即可。对于 try_pop 来说,在弹出 head(即将 head
设为 nex t)之前都应该对 head
加锁,在最后返回时则不需要锁,因此可以把加锁的部分提取到一个新函数中。在比较
head 与 tail 时,对 tail 也存在短暂的访问,因此对 tail
的访问也需要加锁。
template<typename T>
class A {
struct node {
std::shared_ptr<T> val;
std::unique_ptr<node> next;
};
std::unique_ptr<node> head;
node* tail;
std::mutex hm; //@ head mutex
std::mutex tm; //@ tail mutex
node* get_tail()
{
std::lock_guard<std::mutex> l(tm);
return tail;
}
std::unique_ptr<node> pop_head()
{
std::lock_guard<std::mutex> l(hm);
if (head.get() == get_tail())
return nullptr;
std::unique_ptr<node> oldHead = std::move(head);
head = std::move(oldHead->next);
return oldHead;
}
public:
A() : head(new node), tail(head.get()) {}
A(const A&) = delete;
A& operator=(const A&) = delete;
//@ push有两种可能产生异常的情况:一是给mutex上锁,但数据在上锁成功后才会修改
//@ 二是构建智能指针对象时可能抛出异常,但智能指针本身是异常安全的,异常时会释放
void push(T x) //@ 因此push是异常安全的
{
std::shared_ptr<T> newVal(std::make_shared<T>(std::move(x)));
std::unique_ptr<node> p(new node);
node* const newTail = p.get();
std::lock_guard<std::mutex> l(tm); //@ 此处加锁
tail->val = newVal;
tail->next = std::move(p);
tail = newTail;
}
//@ 同理try_pop也是异常安全的
std::shared_ptr<T> try_pop()
{
std::unique_ptr<node> oldHead = pop_head();
return oldHead ? oldHead->val : std::shared_ptr<T>();
}
};
注意在 try_pop 中,tail mutex 要在 head mutex 之中加锁:
std::unique_ptr<node> pop_head()
{
node* const oldTail = get_tail(); //@ 把tail mutex移到head mutex之外是有问题的
//@ 这两行代码之间,head和tail都可能被改变
std::lock_guard<std::mutex> l(hm);
if (head.get() == oldTail) //@ 比较的是最新的head和最老的tail
...
}
这个实现在并发度上也比最初版本的 queue
要好。由于使用了细粒度锁,push
中创建新值和新节点都没上锁,多线程并发创建新值和新节点就不是问题。同一时间内,只有一个线程能添加新节点,但这只需要一个指针赋值操作,持有锁的时间很短。
try_pop 中对 tail mutex
的持有时间也很短,只是用来做一次比较,因此 try_pop 和 push
几乎可以同时调用。try_pop 中持有 head mutex
所做的也只是指针赋值操作,开销较大的析构操作在锁外进行(智能指针的析构特性)。这意味着同一时间内,虽然只有一个线程能调用
pop_head ,但允许多个线程删除节点并安全返回数据,这就提升了
try_pop 的并发调用数量。
接着还需要实现 wait_and_pop,下面是最终版本的
thread-safe queue :
#include <memory>
#include <mutex>
#include <condition_variable>
template<typename T>
class A {
struct node {
std::shared_ptr<T> val;
std::unique_ptr<node> next;
};
std::unique_ptr<node> head;
node* tail;
std::mutex hm; //@ head mutex
std::mutex tm; //@ tail mutex
std::condition_variable cv;
node* get_tail()
{
std::lock_guard<std::mutex> l(tm);
return tail;
}
std::unique_ptr<node> pop_head()
{
std::unique_ptr<node> oldHead = std::move(head);
head = std::move(oldHead->next);
return oldHead;
}
std::unique_lock<std::mutex> wait_for_data()
{
std::unique_lock<std::mutex> l(hm);
cv.wait(l, [&] { return head.get() != get_tail(); });
return std::move(l);
}
std::unique_ptr<node> wait_pop_head()
{
std::unique_lock<std::mutex> l(wait_for_data());
return pop_head();
}
std::unique_ptr<node> wait_pop_head(T& x)
{
std::unique_lock<std::mutex> l(wait_for_data());
x = std::move(*head->val); //@ 即*(head->val)
return pop_head();
}
std::unique_ptr<node> try_pop_head()
{
std::lock_guard<std::mutex> l(hm);
if (head.get() == get_tail()) return std::unique_ptr<node>();
return pop_head();
}
std::unique_ptr<node> try_pop_head(T& x)
{
std::lock_guard<std::mutex> l(hm);
if (head.get() == get_tail()) return std::unique_ptr<node>();
x = std::move(*head->val);
return pop_head();
}
public:
A() : head(new node), tail(head.get()) {}
A(const A&) = delete;
A& operator=(const A&) = delete;
void push(T x)
{
std::shared_ptr<T> newVal(std::make_shared<T>(std::move(x)));
std::unique_ptr<node> p(new node);
{
std::lock_guard<std::mutex> l(tm);
tail->val = newVal;
node* const newTail = p.get();
tail->next = std::move(p);
tail = newTail;
}
cv.notify_one();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_ptr<node> oldHead = wait_pop_head();
return oldHead->val;
}
void wait_and_pop(T& x)
{
std::unique_ptr<node> oldHead = wait_pop_head(x);
}
std::shared_ptr<T> try_pop()
{
std::unique_ptr<node> oldHead = try_pop_head();
return oldHead ? oldHead->val : std::shared_ptr<T>();
}
bool try_pop(T& x)
{
std::unique_ptr<node> oldHead = try_pop_head(x);
return oldHead;
}
bool empty()
{
std::lock_guard<std::mutex> l(hm);
return head.get() == get_tail();
}
};
这个实现是之后实现 lock-free queue 的基础。它是一个
unbounded queue,只要内存足够,线程就能持续push 新值。而
bounded queue 会限定元素数量的最大值,当 queue
填满时,push 会失败或者阻塞至 pop
一个元素,这对分配线程工作是有用的。将 unbounded queue 扩展成
bounded queue 不是难事,这里不再赘述。
使用锁实现thread-safe lookup table¶
lookup table 就是通过 key 查询 value 的数据结构,比如
std::map,但标准库关联容器的接口同样不适合并发访问,最大的问题在于迭代器,其他线程删除元素时导致迭代器失效,因此
lookup table 的接口设计就要跳过迭代器。
为了使用细粒度锁,就不应该使用标准库容器。可选的关联容器数据结构有三种:
- 第一种方式是二叉树(如红黑树),但每次查找修改都要从访问根节点开始,也就表示根节点需要上锁,尽管沿着树向下访问节点时会解锁,但这个比起覆盖整个数据结构的单个锁好不了多少。
- 第二种方式是有序数组,这比二叉树还差,因为无法提前得知一个给定的值应该放在哪,于是同样需要一个覆盖整个数组的锁。
- 第三种方式是哈希表。假如有一个固定数量的桶,一个
key属于哪个桶就取决于key的属性和哈希函数,这意味着可以安全地分开锁住每个桶。如果复用一个支持多个读单个写的mutex,就能将并发度提高相当于桶数量的倍数。
template<typename K, typename V, typename Hash = std::hash<K>>
class A {
class Bucket {
public:
std::list<std::pair<K, V>> data;
mutable std::shared_mutex m; //@ 每个桶都用这个锁保护
V value_for(const K& k, const V& v) const //@ 如果未找到则返回v
{ //@ 没有修改任何值,异常安全
std::shared_lock<std::shared_mutex> l(m); //@ 只读锁,可共享
auto it = std::find_if(data.begin(), data.end(), [&](auto& x) { return x.first == k; });
return it == data.end() ? v : it->second;
}
void add_or_update_mapping(const K& k, const V& v) //@ 找到则修改,未找到则添加
{
std::unique_lock<std::shared_mutex> l(m); //@ 写,单独占用
auto it = std::find_if(data.begin(), data.end(), [&](auto& x) { return x.first == k; });
if (it == data.end())
{
data.emplace_back(k, v); //@ emplace_back异常安全
}
else
{
it->second = v; //@ 赋值可能抛异常,但值是用户提供的,可放心让用户处理
}
}
void remove_mapping(const K& k)
{ //@ std::list::erase不会抛异常,因此异常安全
std::unique_lock<std::shared_mutex> l(m); //@ 写,单独占用
auto it = std::find_if(data.begin(), data.end(), [&](auto& x) { return x.first == k; });
if (it != data.end()) data.erase(it);
}
};
std::vector<std::unique_ptr<Bucket>> buckets;
Hash hasher;
Bucket& get_bucket(const K& k) const
{ //@ 桶数固定因此可以无锁调用
return *buckets[hasher(k) % buckets.size()];
}
public:
//@ 桶的默认数量为19(一般用x%桶数决定放置x的桶的索引,桶数为质数可以使得桶分布均匀)
A(unsigned n = 19, const Hash& h = Hash{}) : buckets(n), hasher(h)
{
for (auto& x : buckets)
x.reset(new Bucket);
}
A(const A&) = delete;
A& operator=(const A&) = delete;
V value_for(const K& k, const V& v = V{}) const
{
return get_bucket(k).value_for(k, v);
}
void add_or_update_mapping(const K& k, const V& v)
{
get_bucket(k).add_or_update_mapping(k, v);
}
void remove_mapping(const K& k)
{
get_bucket(k).remove_mapping(k);
}
//@ 为了方便使用,提供一个到std::map的映射
std::map<K, V> get_map() const
{
std::vector<std::unique_lock<std::shared_mutex>> l;
for (auto& x : buckets)
{
l.push_back(std::unique_lock<std::shared_mutex>(x->m));
}
std::map<K, V> res;
for (auto& x : buckets)
{
for (auto& y : x->data)
res.insert(y);
}
return res;
}
};
使用锁实现thread-safe list¶
template<typename T>
class A {
struct node {
std::mutex m;
std::shared_ptr<T> data;
std::unique_ptr<node> next;
node() : next() {}
node(const T& x) : data(std::make_shared<T>(x)) {}
};
node head;
public:
A() {}
~A() { remove_if([](const node&) { return true; }); }
A(const A&) = delete;
A& operator=(const A&) = delete;
void push_front(const T& x)
{
std::unique_ptr<node> newNode(new node(x));
std::lock_guard<std::mutex> l(head.m);
newNode->next = std::move(head.next);
head.next = std::move(newNode);
}
template<typename F>
void for_each(F f)
{
node* cur = &head;
std::unique_lock<std::mutex> l(head.m);
while (node* const next = cur->next.get())
{
std::unique_lock<std::mutex> nextLock(next->m);
l.unlock(); //@ 锁住了下一节点,因此可以释放上一节点的锁
f(*next->data);
cur = next; //@ 当前节点指向下一节点
l = std::move(nextLock); //@ 转交下一节点锁的所有权,循环上述过程
}
}
template<typename F>
std::shared_ptr<T> find_first_if(F f)
{
node* cur = &head;
std::unique_lock<std::mutex> l(head.m);
while (node* const next = cur->next.get())
{
std::unique_lock<std::mutex> nextLock(next->m);
l.unlock();
if (f(*next->data))
return next->data; //@ f返回true时则返回目标值,无需继续查找
cur = next;
l = std::move(nextLock);
}
return std::shared_ptr<T>();
}
template<typename F>
void remove_if(F f)
{
node* cur = &head;
std::unique_lock<std::mutex> l(head.m);
while (node* const next = cur->next.get())
{
std::unique_lock<std::mutex> nextLock(next->m);
if (f(*next->data))
{ //@ f为true时则移除下一节点
std::unique_ptr<node> oldNext = std::move(cur->next);
cur->next = std::move(next->next); //@ 下一节点设为下下节点
nextLock.unlock();
}
else
{ //@ 否则继续转至下一节点
l.unlock();
cur = next;
l = std::move(nextLock);
}
}
}
};