线程间共享数据

如果共享数据是只读的,那么只读操作不会影响到数据,更不会涉及对数据的修改,所以所有线程都会获得同样的数据。但是,当一个或多个线程要修改共享数据时,就会产生很多麻烦。

线程间共享数据存在的问题

不变量(invariant):关于一个特定数据结构总为 true 的语句。比如:双向链表的两个相邻节点 A 和 B,A 的后指针一定指向 B,B 的前指针一定指向 A。有时程序为了方便会暂时破坏不变量,这通常发生于更新复杂数据结构的过程中,比如删除双向链表中的一个节点 N,要先让 N 的前一个节点指向N的后一个节点(不变量被破坏),再让 N 的后节点指向前节点,最后删除 N(此时不变量重新恢复)。

  • 线程修改共享数据时,就会发生破坏不变量的情况,此时如果有其他线程访问,就可能导致不变量被永久性破坏,这就是 race condition。
  • 如果线程执行顺序的先后对结果无影响,则为不需要关心的良性竞争。需要关心的是不变量被破坏时产生的 race condition。
  • C++ 标准中定义了 data race 的概念,指代一种特定的 race condition,即并发修改单个对象。data race 会造成未定义行为。

避免恶性条件竞争

  • 对数据结构采用某种保护机制,确保只有进行修改的线程才能看到不变量被破坏时的中间状态。从其他访问线程的角度来看,修改不是已经完成了,就是还没开始。
  • 对数据结构和不变量的设计进行修改,修改完的结构必须能完成一系列不可分割的变化,也就是保证每个不变量保持稳定的状态,这就是所谓的无锁编程。
  • 使用事务的方式去处理数据结构的更新。所需的一些数据和读取都存储在事务日志中,然后将之前的操作合为一步,再进行提交。当数据结构被另一个线程修改后,或处理已经重启的情况下,提交就会无法进行,这称作为“软件事务内存”。

使用mutex保护数据

使用 mutex 在访问共享数据前加锁,访问结束后解锁。一个线程用特定的 mutex 锁定后,其他线程必须等待该线程的 mutex 解锁才能访问共享数据。

std::list<int> v;
std::mutex m;

void f(int n)
{
    //@ C++17中引入了类模板实参推断,可简写为std::lock_guard l(m);
    std::lock_guard<std::mutex> l(m);
    v.emplace_back(n);
}

bool listContains(int n)
{
    std::lock_guard<std::mutex> l(m);
    return std::find(std::begin(v), std::end(v), n) != std::end(v);
}
std::scoped_lock g(m1, m2);

一般 mutex 和要保护的数据一起放在类中,定义为 private 数据成员,而非全局变量,这样能让代码更清晰。但如果某个成员函数返回指向数据成员的指针或引用,则通过这个指针的访问行为不会被 mutex 限制,因此需要谨慎设置接口,确保 mutex 能锁住数据。

class A {
private:
    int i;
public:
    void doSomething();
};

class B {
private:
    A data;
    std::mutex m;
public:
    template<typename F>
    void processData(F f)
    {
        std::scoped_lock l(m);
        f(data);  //@ 传递一个被保护的数据给用户函数
    }
};

A* p;
void oops(A& a)
{
    p = &a;
}

B b;
void foo()
{
    b.processData(oops); //@ 传递了一个恶意函数
    p->doSomething(); //@ 未锁定mutex的情况下访问数据
}

接口内在的条件竞争

即便在很简单的接口中,也可能遇到 race condition:

std::stack<int> s;
if (!s.empty())
{
    int n = s.top();
    s.pop();
}
  • 上述代码先检查非空再获取栈顶元素,在单线程中是安全的,但在多线程中,检查非空之后,如果其他线程先pop,就会导致当前线程 top 出错。这是一个经典的 race condition,即使用 mutex 也不能阻止,这就是接口固有的问题,解决方法是改变接口的设计。
  • 另一个潜在的竞争是,如果两个线程都还没 pop,而是分别获取了 top,虽然不会产生未定义行为,但这种对同一值处理了两次的行为更为严重,因为看起来没有任何错误,很难定位 bug。
  • 既然如此,为什么不直接让 pop 返回栈顶元素。原因在于,假设有一个 stack<vector<int>>,拷贝 vector 时需要在堆上分配内存,如果系统负载严重或资源有限(比如 vector 有大量元素),vector 的拷贝构造函数就会抛出 std::bad_alloc 异常。如果 pop 可以返回栈顶元素值,返回一定是最后执行的语句,stack 在返回前已经弹出了元素,但如果拷贝返回值时抛出异常,就会导致弹出的数据丢失(从栈上移除但拷贝失败)。因此 std::stack 的设计者将这个操作分解为top和pop两部分,但这样的分割却造成了 race condition。

解决方案:

方案1:传入一个引用

将变量的引用作为参数,传入 pop() 函数中获取想要的弹出值:

std::vector<int> result;
some_stack.pop(result);

缺陷:

  • 需要构造出一个栈中类型的实例,用于接收目标值。对于一些类型,这样做是不现实的。
  • 元素类型可能不支持赋值。

方案2:无异常抛出的拷贝构造函数或移动构造函数

对于有返回值的 pop() 函数来说,只有“异常安全”方面的担忧。

使用 std::is_nothrow copy_constructiblestd::is_nothrow_move_constructible 即可保证不抛异常。

缺陷:

  • 过于局限,抛异常的构造函数还是更常见的。

方案3:返回指向弹出值的指针

返回一个指向弹出元素的指针,而不是直接返回值。指针的优势是自由拷贝,并且不会产生异常。

缺陷:

  • 返回一个指针需要对对象的内存分配进行管理,对于简单数据类型(比如:int),内存管理的开销要远大于直接返回值。
  • 使用 std::shared_ptr 是个不错的选择;不仅能避免内存泄露,而且标准库能够完全控制内存分配方案。

综合方案

第四种方案是结合方案一二或者一三,比如结合方案一三实现一个线程安全的 stack

struct emptyStack :std::exception
{
    const char* what()const noexcept
    {
        return "empty stack";
    }
};

template<typename T>
class threadsafe_stack
{
private:
    std::stack<T> s;
    mutuable std::mutex m;

public:
    threadsafe_stack() :s(std::stack()) {}
    threadsafe_stack(const threadsafe_stack& rhs)
    {
        std::lock_guard<std::mutex> l(m);
        s = rhs.s;
    }
    threadsafe_stack& operator=(const threadsafe_stack&) = delete;
    void push(T n)
    {
        std::lock_guard<std::mutex> l(m);
        s.push(std::move(n));
    }

    //@ 返回一个指向栈顶元素的指针
    std::shared_ptr<T> pop()
    {
        std::lock_guard<std::mutex> l(m);
        if (s.empty)
            throw emptyStack();

        const std::shared_ptr<T> res(std::make_shared<T>(std::move(s.top())));
        s.pop();
        return res;
    }

    //@ 传引用获取结果
    void pop(T& n)
    {
        std::lock_guard<std::mutex> l(m);
        if (s.empty)
            throw emptyStack();
        n = std::move(s.top());
        s.pop();
    }

    bool empty() const
    {
        std::lock_guard<std::mutex> l(m);
        return s.empty();
    }
};

死锁

  • 死锁的四个必要条件:互斥、占有且等待、不可抢占、循环等待。
  • 避免死锁通常建议让两个 mutex 以相同顺序上锁,总是先锁A再锁B,但这并不适用所有情况。

std::lock

std::lock 可以一次性锁住多个 mutex,并且没有死锁风险。

class A {
public:
    explicit A(int x) : i(x) {}
    int i;
    std::mutex m;
};

void f(A& from, A& to, int n)
{
    std::lock(from.m, to.m);
    /*
    下面按固定顺序加锁,看似不会有死锁的问题,但如果没有std::lock同时上锁,
    另一线程中执行f(to, from, n),两个锁的顺序就反了过来,从而可能导致死锁
    */

    //@ std::adopt_lock表示获取m的所有权
    std::lock_guard<std::mutex> lock1(from.m, std::adopt_lock);
    std::lock_guard<std::mutex> lock2(to.m, std::adopt_lock);
    from.i -= n;
    to.i += n;
}
  • std::lock 可能抛异常,此时就不会上锁,因此 std::lock 保证要么都锁住,要么都不锁。

unique_lock

std::unique_lock<std::mutex> lock1(from.m, std::defer_lock);
std::unique_lock<std::mutex> lock2(to.m, std::defer_lock);
//@ std::defer_lock表示不获取m的所有权,因此m还未上锁
std::lock(lock1, lock2); //@ 此处上锁
std::unique_lock<std::mutex> getLock()
{
    extern std::mutex m;
    std::unique_lock<std::mutex> l(m);
    prepareData();
    return l; //@ 不需要std::move(编译器负责调用移动构造函数)
}

void f()
{
    std::unique_lock<std::mutex> l(getLock());
    doSomething();
}
  • 对一些费时的操作(如文件IO)上锁可能造成很多操作被阻塞,可以在面对这些操作时先解锁
void f()
{
    std::unique_lock<std::mutex> l(m);
    auto data = getData();
    l.unlock(); //@ 费时操作没有必要持有锁,先解锁
    auto res = process(data);
    l.lock(); //@ 为了写入数据再次上锁
    writeResult(data, res);
}
  • 如果支持 C++17,最易最优的同时上锁方法是使用 std::scoped_lock
std::scoped_lock l(from.m, to.m);

避免死锁的建议

  • 建议1:一个线程已经获取一个锁时就不要获取第二个。如果每个线程只有一个锁,锁上就不会产生死锁(但除了互斥锁,其他方面也可能造成死锁,比如即使无锁,线程间相互等待(互相 join)也可能造成死锁)。
  • 建议2:持有锁时避免调用用户提供的代码。用户提供的代码可能做任何事,包括获取锁,如果持有锁时调用用户代码获取锁,就会违反第一个建议,并造成死锁。但有时调用用户代码是无法避免。
  • 建议3:按固定顺序获取锁。如果必须获取多个锁且不能用 std::lock 同时获取,最好在每个线程上用固定顺序获取。
  • 建议4:如果一个锁被低层持有,就不允许再上锁。
class hierarchical_mutex
{
    std::mutex internal_mutex;

    unsigned long const hirearchy_value;
    unsigned long previous_hirearchy_value;
    //@ thread_local 的值来代表当前线程的层级值
    static thread_local unsigned long this_thread_hierarchy_value;

    void check_for_hierarchy_violation()
    {
        if (this_thread_hierarchy_value < hirearchy_value)
        {
            throw std::logic_error("mutex hierachy violated");
        }
    }

    void update_hierarchy_value()
    {
        //@ 先存储当前线程的层级值(用于解锁时恢复)
        previous_hirearchy_value = this_thread_hierarchy_value;
        this_thread_hierarchy_value = hirearchy_value;
    }
public:
    explicit hierarchical_mutex(unsigned long value) :
        hirearchy_value(value),
        previous_hirearchy_value(0)
    {}

    void lock()
    {
        check_for_hierarchy_violation(); //@ 要求线程层级值大于锁的层级值
        internal_mutex.lock();
        update_hierarchy_value();
    }

    void unlock()
    {
        this_thread_hierarchy_value = previous_hirearchy_value;
        internal_mutex.unlock();
    }

    bool try_lock()
    {
        check_for_hierarchy_violation();
        if (!internal_mutex.try_lock())
            return false;
        update_hierarchy_value();
        return true;
    }
};


//@ this_thread_hierarchy_value 被初始化为最大值,所以最初所有线程都能被锁住。
//@ 因为其声明中有 thread_local,所以每个线程都有其拷贝副本
//@ 这样线程中变量状态完全独立,当从另一个线程进行读取时,变量的状态也完全独立。
thread_local unsigned long
hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);

使用示例:

hierarchical_mutex high(10000);
hierarchical_mutex mid(6000);
hierarchical_mutex low(5000);


void lf() //@ 最低层函数
{
    std::scoped_lock l(low);
}

void hf()
{
    std::scoped_lock l(high);
    lf();  //@ 可以调用低层函数
}

void mf()
{
    std::scoped_lock l(mid);
    hf(); //@ 中层调用了高层函数,违反了层次结构
}

其他保护共享数据的可选方式

保护共享数据的初始化

延迟初始化在单线程中很常见:

std::shared_ptr<A> P;
void f()
{
    if (!p)
    {
        p.reset(new A); //@ 在多线程中这里需要保护
    }
    p->doSomething();
}

但在多线程直接上锁会导致不必要的线程资源阻塞:

std::shared_ptr<A> P;
std::mutex m;

void f()
{
    std::unique_lock<std::mutex> l(m); //@ 所有线程会在此处阻塞
    if (!p)
    {
        p.reset(new A);
    }
    l.unlock();
    p->doSomething();
}

常见的方法是双重检查锁模式:

void f()
{
    if (!p)
    {
        std::scoped_lock l(m);
        if (!p)
        {
            p.reset(new A);
        }
    }
    p->doSomething();
}

但双重检查锁也存在潜在的 race condition,第一次的检查没上锁,可能与其他线程中被保护的 reset 操作产生竞争。如果当前线程看见其他线程写入了指针,但没看到新创建的对象实例,调用 doSomething 就会出错:

p.reset(new A);

正常的执行步骤:

  1. 为A对象分配一片内存
  2. 在分配的内存上调用A的构造函数,构造一个A对象
  3. 返回该内存的指针,让p指向该内存

但是,编译器并不是一定按照上面的顺序执行,有可能是32。

为了处理 race condition,C++标准库提供了 std::once_flagstd::call_once

std::once_flag flag;

void f()
{
    std::call_once(flag, [] {std::cout << "init..." << "\n";});
    std::cout << 1 << "\n";
}

int main()
{
    std::thread t1(f);
    std::thread t2(f);
    std::thread t3(f);

    t1.join();
    t2.join();
    t3.join();
}

每个线程只要使用 std::call_once,在 std::call_once 结束时就能安全地知道指针已被其他线程初始化,而且这比使用 mutex 的开销更小:

std::shared_ptr<A> p;
std::once_flag flag;

void init()
{
    p.reset(new A);
}

void f()
{
    std::call_once(flag, init);
    p->doSomething();
}

std::call_once也可以用在类中:

class A {
private:
    std::once_flag flag;
    void init() { ... }
public:
    void f()
    {
        std::call_once(flag, &A::init, this);
        ...
    }
};

static 变量的初始化存在潜在的 race condition:变量声明为 static 时,声明后就完成了初始化,一个线程完成了初始化,其他线程仍会抢着定义这个变量。为此,C++11规定 static 变量的初始化只完全发生在一个线程中,直到初始化完成前其他线程都不会做处理,从而避免了race condition。只有一个全局实例时可以不使用std::call_once 而直接用 static 变量:

class A {
public:
    static A& getInstance();
    A(const A&) = delete;
    A& operator(const A&) = delete;
private:
    A() = default;
    ~A() = default;
};

A& A::getInstance()
{
    static A instance; //@ 线程安全的初始化
    return instance;
}

保护不常更新的数据结构

有些数据(比如缓存中存放的DNS入口表)需要经常访问但更新频率很低,如果用 std::mutex 保护数据有些过度(大量读的操作也会因锁而影响性能),这就需要用上读写锁(reader-writer mutex),它允许多个线程并发读但仅一个线程写。

class A {
private:
    mutable std::shared_mutex m;
    int n = 0;
public:
    int read()
    {
        std::shared_lock<std::shared_mutex> l(m);
        return n;
    }
    void write()
    {
        std::unique_lock<std::shared_mutex> l(m);
        ++n;
    }
};

递归锁

一个线程已经获取 std::mutex(即已上锁)后再次上锁就会产生未定义行为:

std::mutex m;

void f()
{
    m.lock();
    m.unlock();
}

void g()
{
    m.lock();
    f();
    m.unlock();
}

int main()
{
    std::thread t(g);
    t.join(); //@ 产生未定义行为
}

为了允许这种情况,C++ 提供了 std::recursive_mutex,它可以在一个线程上多次获取锁,但在其他线程获取锁之前必须释放所有的锁。

多数情况下,如果需要递归锁,说明代码设计存在问题。比如一个类的每个成员函数都会上锁,一个成员函数调用另一个成员函数,就可能多次上锁,这种情况用递归锁就可以避免产生未定义行为。但显然这个设计本身是有问题的,更好的办法是提取其中一个函数作为 private 成员并且不上锁,其他成员先上锁再调用该函数。