Contents
线程间共享数据¶
如果共享数据是只读的,那么只读操作不会影响到数据,更不会涉及对数据的修改,所以所有线程都会获得同样的数据。但是,当一个或多个线程要修改共享数据时,就会产生很多麻烦。
线程间共享数据存在的问题¶
不变量(invariant):关于一个特定数据结构总为 true
的语句。比如:双向链表的两个相邻节点 A 和 B,A 的后指针一定指向 B,B
的前指针一定指向
A。有时程序为了方便会暂时破坏不变量,这通常发生于更新复杂数据结构的过程中,比如删除双向链表中的一个节点
N,要先让 N 的前一个节点指向N的后一个节点(不变量被破坏),再让 N
的后节点指向前节点,最后删除 N(此时不变量重新恢复)。
- 线程修改共享数据时,就会发生破坏不变量的情况,此时如果有其他线程访问,就可能导致不变量被永久性破坏,这就是 race condition。
- 如果线程执行顺序的先后对结果无影响,则为不需要关心的良性竞争。需要关心的是不变量被破坏时产生的 race condition。
- C++ 标准中定义了 data race 的概念,指代一种特定的 race condition,即并发修改单个对象。data race 会造成未定义行为。
使用mutex保护数据¶
使用 mutex 在访问共享数据前加锁,访问结束后解锁。一个线程用特定的
mutex 锁定后,其他线程必须等待该线程的 mutex
解锁才能访问共享数据。
- C++提供了
std::mutex
来创建一个
mutex,可通过 std::mutex::lock 加锁,通过 std::mutex::unlock 解锁,但一般不直接使用这两个函数。 - std::lock_guard 是一个用 std::mutex 构造的 RAII 模板类。
std::list<int> v;
std::mutex m;
void f(int n)
{
//@ C++17中引入了类模板实参推断,可简写为std::lock_guard l(m);
std::lock_guard<std::mutex> l(m);
v.emplace_back(n);
}
bool listContains(int n)
{
std::lock_guard<std::mutex> l(m);
return std::find(std::begin(v), std::end(v), n) != std::end(v);
}
- C++17提供了加强版的 std::scoped_lock,它可以接受任意数量的 std::mutex,可完全取代 std::lock_guard
std::scoped_lock g(m1, m2);
一般 mutex 和要保护的数据一起放在类中,定义为 private
数据成员,而非全局变量,这样能让代码更清晰。但如果某个成员函数返回指向数据成员的指针或引用,则通过这个指针的访问行为不会被
mutex 限制,因此需要谨慎设置接口,确保 mutex 能锁住数据。
class A {
private:
int i;
public:
void doSomething();
};
class B {
private:
A data;
std::mutex m;
public:
template<typename F>
void processData(F f)
{
std::scoped_lock l(m);
f(data); //@ 传递一个被保护的数据给用户函数
}
};
A* p;
void oops(A& a)
{
p = &a;
}
B b;
void foo()
{
b.processData(oops); //@ 传递了一个恶意函数
p->doSomething(); //@ 未锁定mutex的情况下访问数据
}
接口内在的条件竞争¶
即便在很简单的接口中,也可能遇到 race condition:
std::stack<int> s;
if (!s.empty())
{
int n = s.top();
s.pop();
}
- 上述代码先检查非空再获取栈顶元素,在单线程中是安全的,但在多线程中,检查非空之后,如果其他线程先
pop,就会导致当前线程top出错。这是一个经典的race condition,即使用mutex也不能阻止,这就是接口固有的问题,解决方法是改变接口的设计。 - 另一个潜在的竞争是,如果两个线程都还没
pop,而是分别获取了top,虽然不会产生未定义行为,但这种对同一值处理了两次的行为更为严重,因为看起来没有任何错误,很难定位 bug。 - 既然如此,为什么不直接让
pop返回栈顶元素。原因在于,假设有一个stack<vector<int>>,拷贝vector时需要在堆上分配内存,如果系统负载严重或资源有限(比如vector有大量元素),vector的拷贝构造函数就会抛出 std::bad_alloc 异常。如果pop可以返回栈顶元素值,返回一定是最后执行的语句,stack在返回前已经弹出了元素,但如果拷贝返回值时抛出异常,就会导致弹出的数据丢失(从栈上移除但拷贝失败)。因此 std::stack 的设计者将这个操作分解为top和pop两部分,但这样的分割却造成了 race condition。
解决方案:
方案1:传入一个引用
将变量的引用作为参数,传入 pop() 函数中获取想要的弹出值:
std::vector<int> result;
some_stack.pop(result);
缺陷:
- 需要构造出一个栈中类型的实例,用于接收目标值。对于一些类型,这样做是不现实的。
- 元素类型可能不支持赋值。
方案2:无异常抛出的拷贝构造函数或移动构造函数
对于有返回值的 pop() 函数来说,只有“异常安全”方面的担忧。
使用 std::is_nothrow copy_constructible和 std::is_nothrow_move_constructible 即可保证不抛异常。
缺陷:
- 过于局限,抛异常的构造函数还是更常见的。
方案3:返回指向弹出值的指针
返回一个指向弹出元素的指针,而不是直接返回值。指针的优势是自由拷贝,并且不会产生异常。
缺陷:
- 返回一个指针需要对对象的内存分配进行管理,对于简单数据类型(比如:
int),内存管理的开销要远大于直接返回值。 - 使用
std::shared_ptr是个不错的选择;不仅能避免内存泄露,而且标准库能够完全控制内存分配方案。
综合方案
第四种方案是结合方案一二或者一三,比如结合方案一三实现一个线程安全的
stack :
struct emptyStack :std::exception
{
const char* what()const noexcept
{
return "empty stack";
}
};
template<typename T>
class threadsafe_stack
{
private:
std::stack<T> s;
mutuable std::mutex m;
public:
threadsafe_stack() :s(std::stack()) {}
threadsafe_stack(const threadsafe_stack& rhs)
{
std::lock_guard<std::mutex> l(m);
s = rhs.s;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T n)
{
std::lock_guard<std::mutex> l(m);
s.push(std::move(n));
}
//@ 返回一个指向栈顶元素的指针
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> l(m);
if (s.empty)
throw emptyStack();
const std::shared_ptr<T> res(std::make_shared<T>(std::move(s.top())));
s.pop();
return res;
}
//@ 传引用获取结果
void pop(T& n)
{
std::lock_guard<std::mutex> l(m);
if (s.empty)
throw emptyStack();
n = std::move(s.top());
s.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> l(m);
return s.empty();
}
};
死锁¶
- 死锁的四个必要条件:互斥、占有且等待、不可抢占、循环等待。
- 避免死锁通常建议让两个
mutex以相同顺序上锁,总是先锁A再锁B,但这并不适用所有情况。
std::lock¶
std::lock
可以一次性锁住多个 mutex,并且没有死锁风险。
class A {
public:
explicit A(int x) : i(x) {}
int i;
std::mutex m;
};
void f(A& from, A& to, int n)
{
std::lock(from.m, to.m);
/*
下面按固定顺序加锁,看似不会有死锁的问题,但如果没有std::lock同时上锁,
另一线程中执行f(to, from, n),两个锁的顺序就反了过来,从而可能导致死锁
*/
//@ std::adopt_lock表示获取m的所有权
std::lock_guard<std::mutex> lock1(from.m, std::adopt_lock);
std::lock_guard<std::mutex> lock2(to.m, std::adopt_lock);
from.i -= n;
to.i += n;
}
unique_lock¶
- std::unique_lock
比
std::lock_guard
更加灵活:
- 可以指定参数 std::adopt_lock 管理mutex。
- 可以指定参数
std::defer_lock
表示mutex应保持解锁状态,以使
mutex能被 std::unique_lock::lock获取。 - 可以把 std::unique_lock 传给 std::lock
std::unique_lock<std::mutex> lock1(from.m, std::defer_lock);
std::unique_lock<std::mutex> lock2(to.m, std::defer_lock);
//@ std::defer_lock表示不获取m的所有权,因此m还未上锁
std::lock(lock1, lock2); //@ 此处上锁
- std::unique_lock比 std::lock_guard占用的空间多,会稍慢一点,如果不需要更灵活的锁,依然可以使用std::lock_guard。另一种要求灵活性的情况是转移锁的所有权到另一个作用域。
std::unique_lock<std::mutex> getLock()
{
extern std::mutex m;
std::unique_lock<std::mutex> l(m);
prepareData();
return l; //@ 不需要std::move(编译器负责调用移动构造函数)
}
void f()
{
std::unique_lock<std::mutex> l(getLock());
doSomething();
}
- 对一些费时的操作(如文件IO)上锁可能造成很多操作被阻塞,可以在面对这些操作时先解锁
void f()
{
std::unique_lock<std::mutex> l(m);
auto data = getData();
l.unlock(); //@ 费时操作没有必要持有锁,先解锁
auto res = process(data);
l.lock(); //@ 为了写入数据再次上锁
writeResult(data, res);
}
- 如果支持 C++17,最易最优的同时上锁方法是使用 std::scoped_lock 。
std::scoped_lock l(from.m, to.m);
避免死锁的建议¶
- 建议1:一个线程已经获取一个锁时就不要获取第二个。如果每个线程只有一个锁,锁上就不会产生死锁(但除了互斥锁,其他方面也可能造成死锁,比如即使无锁,线程间相互等待(互相
join)也可能造成死锁)。 - 建议2:持有锁时避免调用用户提供的代码。用户提供的代码可能做任何事,包括获取锁,如果持有锁时调用用户代码获取锁,就会违反第一个建议,并造成死锁。但有时调用用户代码是无法避免。
- 建议3:按固定顺序获取锁。如果必须获取多个锁且不能用 std::lock 同时获取,最好在每个线程上用固定顺序获取。
- 建议4:如果一个锁被低层持有,就不允许再上锁。
class hierarchical_mutex
{
std::mutex internal_mutex;
unsigned long const hirearchy_value;
unsigned long previous_hirearchy_value;
//@ thread_local 的值来代表当前线程的层级值
static thread_local unsigned long this_thread_hierarchy_value;
void check_for_hierarchy_violation()
{
if (this_thread_hierarchy_value < hirearchy_value)
{
throw std::logic_error("mutex hierachy violated");
}
}
void update_hierarchy_value()
{
//@ 先存储当前线程的层级值(用于解锁时恢复)
previous_hirearchy_value = this_thread_hierarchy_value;
this_thread_hierarchy_value = hirearchy_value;
}
public:
explicit hierarchical_mutex(unsigned long value) :
hirearchy_value(value),
previous_hirearchy_value(0)
{}
void lock()
{
check_for_hierarchy_violation(); //@ 要求线程层级值大于锁的层级值
internal_mutex.lock();
update_hierarchy_value();
}
void unlock()
{
this_thread_hierarchy_value = previous_hirearchy_value;
internal_mutex.unlock();
}
bool try_lock()
{
check_for_hierarchy_violation();
if (!internal_mutex.try_lock())
return false;
update_hierarchy_value();
return true;
}
};
//@ this_thread_hierarchy_value 被初始化为最大值,所以最初所有线程都能被锁住。
//@ 因为其声明中有 thread_local,所以每个线程都有其拷贝副本
//@ 这样线程中变量状态完全独立,当从另一个线程进行读取时,变量的状态也完全独立。
thread_local unsigned long
hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);
使用示例:
hierarchical_mutex high(10000);
hierarchical_mutex mid(6000);
hierarchical_mutex low(5000);
void lf() //@ 最低层函数
{
std::scoped_lock l(low);
}
void hf()
{
std::scoped_lock l(high);
lf(); //@ 可以调用低层函数
}
void mf()
{
std::scoped_lock l(mid);
hf(); //@ 中层调用了高层函数,违反了层次结构
}
其他保护共享数据的可选方式¶
保护共享数据的初始化¶
延迟初始化在单线程中很常见:
std::shared_ptr<A> P;
void f()
{
if (!p)
{
p.reset(new A); //@ 在多线程中这里需要保护
}
p->doSomething();
}
但在多线程直接上锁会导致不必要的线程资源阻塞:
std::shared_ptr<A> P;
std::mutex m;
void f()
{
std::unique_lock<std::mutex> l(m); //@ 所有线程会在此处阻塞
if (!p)
{
p.reset(new A);
}
l.unlock();
p->doSomething();
}
常见的方法是双重检查锁模式:
void f()
{
if (!p)
{
std::scoped_lock l(m);
if (!p)
{
p.reset(new A);
}
}
p->doSomething();
}
但双重检查锁也存在潜在的 race
condition,第一次的检查没上锁,可能与其他线程中被保护的 reset
操作产生竞争。如果当前线程看见其他线程写入了指针,但没看到新创建的对象实例,调用
doSomething 就会出错:
p.reset(new A);
正常的执行步骤:
- 为A对象分配一片内存
- 在分配的内存上调用A的构造函数,构造一个A对象
- 返回该内存的指针,让p指向该内存
但是,编译器并不是一定按照上面的顺序执行,有可能是32。
为了处理 race condition,C++标准库提供了 std::once_flag 和 std::call_once 。
std::once_flag flag;
void f()
{
std::call_once(flag, [] {std::cout << "init..." << "\n";});
std::cout << 1 << "\n";
}
int main()
{
std::thread t1(f);
std::thread t2(f);
std::thread t3(f);
t1.join();
t2.join();
t3.join();
}
每个线程只要使用
std::call_once,在
std::call_once
结束时就能安全地知道指针已被其他线程初始化,而且这比使用 mutex
的开销更小:
std::shared_ptr<A> p;
std::once_flag flag;
void init()
{
p.reset(new A);
}
void f()
{
std::call_once(flag, init);
p->doSomething();
}
std::call_once也可以用在类中:
class A {
private:
std::once_flag flag;
void init() { ... }
public:
void f()
{
std::call_once(flag, &A::init, this);
...
}
};
static 变量的初始化存在潜在的 race condition:变量声明为 static
时,声明后就完成了初始化,一个线程完成了初始化,其他线程仍会抢着定义这个变量。为此,C++11规定
static
变量的初始化只完全发生在一个线程中,直到初始化完成前其他线程都不会做处理,从而避免了race
condition。只有一个全局实例时可以不使用std::call_once
而直接用 static 变量:
class A {
public:
static A& getInstance();
A(const A&) = delete;
A& operator(const A&) = delete;
private:
A() = default;
~A() = default;
};
A& A::getInstance()
{
static A instance; //@ 线程安全的初始化
return instance;
}
保护不常更新的数据结构¶
有些数据(比如缓存中存放的DNS入口表)需要经常访问但更新频率很低,如果用 std::mutex 保护数据有些过度(大量读的操作也会因锁而影响性能),这就需要用上读写锁(reader-writer mutex),它允许多个线程并发读但仅一个线程写。
- C++17提供了 std::shared_mutex和 std::shared_timed_mutex(C++14),后者比前者提供了更多操作,但前者性能更高。C++11没有提供读写锁,为此可使用 boost::shared_mutex 。
- C++14提供了std::shared_lock,用法和 std::unique_lock 相同,此外std::shared_lock还允许多线程同时获取共享锁,因此一般用 std::shared_lock 锁定读,std::unique_lock 锁定写。
- 读写锁并不是万能的,其性能与处理器数量及读写线程的负载有关。
class A {
private:
mutable std::shared_mutex m;
int n = 0;
public:
int read()
{
std::shared_lock<std::shared_mutex> l(m);
return n;
}
void write()
{
std::unique_lock<std::shared_mutex> l(m);
++n;
}
};
递归锁¶
一个线程已经获取 std::mutex(即已上锁)后再次上锁就会产生未定义行为:
std::mutex m;
void f()
{
m.lock();
m.unlock();
}
void g()
{
m.lock();
f();
m.unlock();
}
int main()
{
std::thread t(g);
t.join(); //@ 产生未定义行为
}
为了允许这种情况,C++ 提供了 std::recursive_mutex,它可以在一个线程上多次获取锁,但在其他线程获取锁之前必须释放所有的锁。
多数情况下,如果需要递归锁,说明代码设计存在问题。比如一个类的每个成员函数都会上锁,一个成员函数调用另一个成员函数,就可能多次上锁,这种情况用递归锁就可以避免产生未定义行为。但显然这个设计本身是有问题的,更好的办法是提取其中一个函数作为
private 成员并且不上锁,其他成员先上锁再调用该函数。