基于锁的并发数据结构的设计

设计并发数据结构的建议

设计并发数据结构要考虑两点,一是确保访问thread-safe,二是允许真正的并发访问。thread-safe 基本要求如下

  • 数据结构的不变量被一个线程破坏时,确保不被线程看到此状态。
  • 提供操作完整的函数来避免数据结构接口中固有的 race condition。
  • 注意数据结构出现异常时的行为,以确保不变量不被破坏。
  • 限制锁的范围,避免可能的嵌套锁,最小化死锁的概率。

关于真正的并发访问没有太多建议,但作为数据结构的设计者需要考虑以下问题

  • 部分操作是否可以在锁的范围外执行。
  • 数据结构的不同部分是否被不同的 mutex 保护。
  • 是否所有操作需要同级别的保护。
  • 在不影响操作语义的前提下,能否对数据结构做一个简单的修改提高并发访问的概率。

这些问题可以总结为一点,即如何最小化线程对被 mutex 保护的数据的轮流访问,以及最大化真实的并发量。

使用锁实现thread-safe stack

struct emptyStack : std::exception
{
    const char* what() const noexcept
    {
        return "empty stack!";
    }
};

template<typename T>
class A {
    std::stack<T> s;
    mutable std::mutex m;
public:
    A() : s(std::stack<T>()) {}

    A(const A& rhs)
    {
        std::lock_guard<std::mutex> l(rhs.m);
        s = rhs.s;
    }

    A& operator=(const A&) = delete;

    void push(T n)
    {
        std::lock_guard<std::mutex> l(m);
        s.push(std::move(n));
    }

    std::shared_ptr<T> pop() //@ 返回一个指向栈顶元素的指针
    {
        std::lock_guard<std::mutex> l(m);
        if (s.empty()) throw emptyStack();
        const std::shared_ptr<T> res(std::make_shared<T>(std::move(s.top())));
        s.pop();
        return res;
    }

    void pop(T& n) //@ 传引用获取结果
    {
        std::lock_guard<std::mutex> l(m);
        if (s.empty()) throw emptyStack();
        n = std::move(s.top());
        s.pop();
    }

    bool empty() const
    {
        std::lock_guard<std::mutex> l(m);
        return s.empty();
    }
};

使用锁和条件变量实现thread-safe queue

template<typename T>
class A {
    mutable std::mutex m;
    std::queue<T> q;
    std::condition_variable cv;
public:
    A() {}
    A(const A& rhs)
    {
        std::lock_guard<std::mutex> l(rhs.m);
        q = rhs.q;
    }

    void push(T x)
    {
        std::lock_guard<std::mutex> l(m);
        q.push(std::move(x));
        cv.notify_one();
    }

    void wait_and_pop(T& x)
    {
        std::unique_lock<std::mutex> l(m);
        //@ 在queue中有元素时才返回,而不必持续调用empty
        cv.wait(l, [this] { return !q.empty(); });
        x = std::move(q.front());
        q.pop();
    }

    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_lock<std::mutex> l(m);
        cv.wait(l, [this] { return !q.empty(); });
        std::shared_ptr<T> res(std::make_shared<T>(std::move(q.front())));
        q.pop();
        return res;
    }

    bool try_pop(T& x)
    {
        std::lock_guard<std::mutex> l(m);
        if (q.empty()) return false;
        x = std::move(q.front());
        q.pop();
        return true;
    }

    std::shared_ptr<T> try_pop()
    {
        std::lock_guard<std::mutex> l(m);
        if (q.empty()) return std::shared_ptr<T>();
        std::shared_ptr<T> res(std::make_shared<T>(std::move(q.front())));
        q.pop();
        return res;
    }

    bool empty() const
    {
        std::lock_guard<std::mutex> l(m);
        //@ 其他线程可能有此对象(拷贝构造)所以要上锁
        return q.empty();
    }
};

这引入了一个异常安全问题,多个线程处于等待时,notify_one 只会唤醒一个线程,如果这个线程在wait_and_pop中(比如构造 std::shared_ptr 对象时就可能)抛出异常,其余线程将永远不会唤醒。

  • notify_one 改为 notify_all,这样就会唤醒所有线程,代价是大部分线程发现 queue 仍为 empty 时,又会继续休眠。
  • 抛出异常时让 wait_and_pop 调用 notify_one,从而唤醒另一个线程。
  • std::shared_ptr 的初始化移到 push 中,并且内部的 std::queue 不直接存储值,而是存储管理值的std::shared_ptr
#include <memory>
#include <mutex>
#include <condition_variable>
#include <queue>

template<typename T>
class A {
    mutable std::mutex m;
    std::queue<std::shared_ptr<T>> q; //@ 之前为std::queue<T> q
    std::condition_variable cv;
public:
    A() {}
    A(const A& rhs)
    {
        std::lock_guard<std::mutex> l(rhs.m);
        q = rhs.q;
    }

    void push(T x)
    {
        std::shared_ptr<T> data(std::make_shared<T>(std::move(x)));
        //@ 上面的构造在锁外完成,之前只能在pop中且持有锁时完成
        //@ 内存分配操作开销很大,这种做法减少了mutex的持有时间,提升了性能
        std::lock_guard<std::mutex> l(m);
        q.push(data); //@ 之前为q.push(std::move(x))
        cv.notify_one();
    }

    void wait_and_pop(T& x)
    {
        std::unique_lock<std::mutex> l(m);
        cv.wait(l, [this] { return !q.empty(); });
        x = std::move(*q.front()); //@ 之前为std::move(q.front())
        q.pop();
    }

    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_lock<std::mutex> l(m);
        cv.wait(l, [this] { return !q.empty(); });
        std::shared_ptr<T> res = q.front();
        //@ 之前为std::make_shared<T>(std::move(q.front()))
        //@ 现在构造转移到了push中
        q.pop();
        return res;
    }

    bool try_pop(T& x)
    {
        std::lock_guard<std::mutex> l(m);
        if (q.empty()) return false;
        x = std::move(*q.front()); //@ 之前为std::move(q.front())
        q.pop();
        return true;
    }

    std::shared_ptr<T> try_pop()
    {
        std::lock_guard<std::mutex> l(m);
        if (q.empty()) return std::shared_ptr<T>();
        std::shared_ptr<T> res = q.front();
        //@ 之前为std::make_shared<T>(std::move(q.front()))
        q.pop();
        return res;
    }

    bool empty() const
    {
        std::lock_guard<std::mutex> l(m);
        return q.empty();
    }
};

使用细粒度锁和条件变量实现thread-safe queue

thread-safe stack 一样,使用 mutex 保护了整个数据结构,但限制了对并发的支持。多线程在各种成员函数中被阻塞,而只有一个线程能在同一时间做任何事。不过这种限制主要是因为内部实现使用的是 std::queue,为了支持更高级别的并发就需要提供更细粒度的锁,为了提供更细粒度的锁就要换一种实现方式。

最简单的 queue 实现方式是包含头尾指针的单链表

template<typename T>
class A {
    struct node {
        T val;
        std::unique_ptr<node> next;
        node(T x) : val(std::move(x)) {}
    };
    std::unique_ptr<node> head;
    node* tail;
public:
    A() {}
    A(const A&) = delete;
    A& operator=(const A&) = delete;
    void push(T x)
    {
        std::unique_ptr<node> p(new node(std::move(x))); //@ 新建一个值为x的节点
        node* const oldHead = p.get(); //@ 获取新节点的原始指针
        if (tail)
        { //@ 2:如果尾节点不为空则next设为新节点
            tail->next = std::move(p);
        }
        else
        { //@ 如果尾节点为空(说明无元素)则头节点设为新节点
            head = std::move(p);
        }
        tail = oldHead; //@ tail设为新节点的原始指针
    }

    std::shared_ptr<T> try_pop()
    {
        if (!head)
            return std::shared_ptr<T>(); //@ 头节点为空则返回空指针
        std::shared_ptr<T> res(std::make_shared<T>(std::move(head->val))); //@ 保存头节点值
        std::unique_ptr<node> oldHead = std::move(head); //@ 获取头节点
        head = std::move(oldHead->next); //@ 1:头节点指向下一个节点
        return res; //@ 返回之前保存的头节点的值
    }
};

这是一个单线程下没问题的 queue,但在多线程下就有明显问题,即使用两个 mutex 分别保护头尾指针。push 可以同时修改头尾指针,会对两个 mutex 上锁,更严重的是 pushtry_pop 都能访问 next 节点(见注释1和2处),仅有一个元素时头尾指针相等,两处的 next 也是同一对象,于是 try_popnextpushnext 就产生了竞争,锁的也是同一个 mutex

这个问题可以通过在 queue 的最后预设一个 dummy node 解决:

template<typename T>
class A {
    struct node {
        std::shared_ptr<T> val; //@ 之前为T val
        std::unique_ptr<node> next;
    };
    std::unique_ptr<node> head;
    node* tail;
public:
    A() : head(new node), tail(head.get()) {} //@ 预设头尾节点指向一处(未存储任何值)
    A(const A&) = delete;
    A& operator=(const A&) = delete;
    void push(T x)
    { //@ 现在push只访问tail而不访问head,意味着不再会与try_pop操作同一节点而争夺锁
        std::shared_ptr<T> newVal(std::make_shared<T>(std::move(x))); //@ 新建一个保存新值的指针
        std::unique_ptr<node> p(new node); //@ 新建一个不存储值的新节点(用作新的尾节点)
        node* const newTail = p.get(); //@ 获取新节点的原始指针
        tail->val = newVal; //@ 当前尾节点的值设为新值(原本不存储值)
        tail->next = std::move(p); //@ tail->next设为新节点
        tail = newTail; //@ 最后令tail指向新节点
    }

    std::shared_ptr<T> try_pop()
    { //@ 同时访问head和tail只在最初的比较上,锁是短暂的
      //@ 之前的判断条件为if (!head),现在为头节点与尾节点指向一处
        if (head.get() == tail)
            return std::shared_ptr<T>(); //@ 返回空指针
        std::shared_ptr<T> res(head->val); //@ 保存头节点的值,现在为std::shared_ptr类型
        std::unique_ptr<node> oldHead = std::move(head); //@ 获取旧的头节点
        head = std::move(oldHead->next); //@ 头节点指向下一个节点
        return res; //@ 返回之前保存的头节点的值
    }
};

接着加上锁,锁的范围应该尽可能小。对于 push 来说很简单,对 tail 的访问上锁即可。对于 try_pop 来说,在弹出 head(即将 head 设为 nex t)之前都应该对 head 加锁,在最后返回时则不需要锁,因此可以把加锁的部分提取到一个新函数中。在比较 headtail 时,对 tail 也存在短暂的访问,因此对 tail 的访问也需要加锁。

template<typename T>
class A {
    struct node {
        std::shared_ptr<T> val;
        std::unique_ptr<node> next;
    };
    std::unique_ptr<node> head;
    node* tail;
    std::mutex hm; //@ head mutex
    std::mutex tm; //@ tail mutex

    node* get_tail()
    {
        std::lock_guard<std::mutex> l(tm);
        return tail;
    }

    std::unique_ptr<node> pop_head()
    {
        std::lock_guard<std::mutex> l(hm);
        if (head.get() == get_tail())
            return nullptr;
        std::unique_ptr<node> oldHead = std::move(head);
        head = std::move(oldHead->next);
        return oldHead;
    }
public:
    A() : head(new node), tail(head.get()) {}
    A(const A&) = delete;
    A& operator=(const A&) = delete;
    //@ push有两种可能产生异常的情况:一是给mutex上锁,但数据在上锁成功后才会修改
    //@ 二是构建智能指针对象时可能抛出异常,但智能指针本身是异常安全的,异常时会释放
    void push(T x) //@ 因此push是异常安全的
    {
        std::shared_ptr<T> newVal(std::make_shared<T>(std::move(x)));
        std::unique_ptr<node> p(new node);
        node* const newTail = p.get();
        std::lock_guard<std::mutex> l(tm); //@ 此处加锁
        tail->val = newVal;
        tail->next = std::move(p);
        tail = newTail;
    }
    //@ 同理try_pop也是异常安全的
    std::shared_ptr<T> try_pop()
    {
        std::unique_ptr<node> oldHead = pop_head();
        return oldHead ? oldHead->val : std::shared_ptr<T>();
    }
};

注意在 try_pop 中,tail mutex 要在 head mutex 之中加锁:

std::unique_ptr<node> pop_head()
{
    node* const oldTail = get_tail(); //@ 把tail mutex移到head mutex之外是有问题的
    //@ 这两行代码之间,head和tail都可能被改变
    std::lock_guard<std::mutex> l(hm);
    if (head.get() == oldTail) //@ 比较的是最新的head和最老的tail
    ...
}

这个实现在并发度上也比最初版本的 queue 要好。由于使用了细粒度锁,push 中创建新值和新节点都没上锁,多线程并发创建新值和新节点就不是问题。同一时间内,只有一个线程能添加新节点,但这只需要一个指针赋值操作,持有锁的时间很短。

try_pop 中对 tail mutex 的持有时间也很短,只是用来做一次比较,因此 try_poppush 几乎可以同时调用。try_pop 中持有 head mutex 所做的也只是指针赋值操作,开销较大的析构操作在锁外进行(智能指针的析构特性)。这意味着同一时间内,虽然只有一个线程能调用 pop_head ,但允许多个线程删除节点并安全返回数据,这就提升了 try_pop 的并发调用数量。

接着还需要实现 wait_and_pop,下面是最终版本的 thread-safe queue :

#include <memory>
#include <mutex>
#include <condition_variable>

template<typename T>
class A {
    struct node {
        std::shared_ptr<T> val;
        std::unique_ptr<node> next;
    };
    std::unique_ptr<node> head;
    node* tail;
    std::mutex hm; //@ head mutex
    std::mutex tm; //@ tail mutex
    std::condition_variable cv;

    node* get_tail()
    {
        std::lock_guard<std::mutex> l(tm);
        return tail;
    }

    std::unique_ptr<node> pop_head()
    {
        std::unique_ptr<node> oldHead = std::move(head);
        head = std::move(oldHead->next);
        return oldHead;
    }

    std::unique_lock<std::mutex> wait_for_data()
    {
        std::unique_lock<std::mutex> l(hm);
        cv.wait(l, [&] { return head.get() != get_tail(); });
        return std::move(l);
    }

    std::unique_ptr<node> wait_pop_head()
    {
        std::unique_lock<std::mutex> l(wait_for_data());
        return pop_head();
    }

    std::unique_ptr<node> wait_pop_head(T& x)
    {
        std::unique_lock<std::mutex> l(wait_for_data());
        x = std::move(*head->val); //@ 即*(head->val)
        return pop_head();
    }

    std::unique_ptr<node> try_pop_head()
    {
        std::lock_guard<std::mutex> l(hm);
        if (head.get() == get_tail()) return std::unique_ptr<node>();
        return pop_head();
    }

    std::unique_ptr<node> try_pop_head(T& x)
    {
        std::lock_guard<std::mutex> l(hm);
        if (head.get() == get_tail()) return std::unique_ptr<node>();
        x = std::move(*head->val);
        return pop_head();
    }
public:
    A() : head(new node), tail(head.get()) {}
    A(const A&) = delete;
    A& operator=(const A&) = delete;
    void push(T x)
    {
        std::shared_ptr<T> newVal(std::make_shared<T>(std::move(x)));
        std::unique_ptr<node> p(new node);
        {
            std::lock_guard<std::mutex> l(tm);
            tail->val = newVal;
            node* const newTail = p.get();
            tail->next = std::move(p);
            tail = newTail;
        }
        cv.notify_one();
    }

    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_ptr<node> oldHead = wait_pop_head();
        return oldHead->val;
    }

    void wait_and_pop(T& x)
    {
        std::unique_ptr<node> oldHead = wait_pop_head(x);
    }

    std::shared_ptr<T> try_pop()
    {
        std::unique_ptr<node> oldHead = try_pop_head();
        return oldHead ? oldHead->val : std::shared_ptr<T>();
    }

    bool try_pop(T& x)
    {
        std::unique_ptr<node> oldHead = try_pop_head(x);
        return oldHead;
    }

    bool empty()
    {
        std::lock_guard<std::mutex> l(hm);
        return head.get() == get_tail();
    }
};

这个实现是之后实现 lock-free queue 的基础。它是一个 unbounded queue,只要内存足够,线程就能持续push 新值。而 bounded queue 会限定元素数量的最大值,当 queue 填满时,push 会失败或者阻塞至 pop 一个元素,这对分配线程工作是有用的。将 unbounded queue 扩展成 bounded queue 不是难事,这里不再赘述。

使用锁实现thread-safe lookup table

lookup table 就是通过 key 查询 value 的数据结构,比如 std::map,但标准库关联容器的接口同样不适合并发访问,最大的问题在于迭代器,其他线程删除元素时导致迭代器失效,因此 lookup table 的接口设计就要跳过迭代器。

为了使用细粒度锁,就不应该使用标准库容器。可选的关联容器数据结构有三种:

  • 第一种方式是二叉树(如红黑树),但每次查找修改都要从访问根节点开始,也就表示根节点需要上锁,尽管沿着树向下访问节点时会解锁,但这个比起覆盖整个数据结构的单个锁好不了多少。
  • 第二种方式是有序数组,这比二叉树还差,因为无法提前得知一个给定的值应该放在哪,于是同样需要一个覆盖整个数组的锁。
  • 第三种方式是哈希表。假如有一个固定数量的桶,一个 key 属于哪个桶就取决于 key 的属性和哈希函数,这意味着可以安全地分开锁住每个桶。如果复用一个支持多个读单个写的 mutex,就能将并发度提高相当于桶数量的倍数。
template<typename K, typename V, typename Hash = std::hash<K>>
class A {
    class Bucket {
    public:
        std::list<std::pair<K, V>> data;
        mutable std::shared_mutex m; //@ 每个桶都用这个锁保护

        V value_for(const K& k, const V& v) const //@ 如果未找到则返回v
        { //@ 没有修改任何值,异常安全
            std::shared_lock<std::shared_mutex> l(m); //@ 只读锁,可共享
            auto it = std::find_if(data.begin(), data.end(), [&](auto& x) { return x.first == k; });
            return it == data.end() ? v : it->second;
        }

        void add_or_update_mapping(const K& k, const V& v) //@ 找到则修改,未找到则添加
        {
            std::unique_lock<std::shared_mutex> l(m); //@ 写,单独占用
            auto it = std::find_if(data.begin(), data.end(), [&](auto& x) { return x.first == k; });
            if (it == data.end())
            {
                data.emplace_back(k, v); //@ emplace_back异常安全
            }
            else
            {
                it->second = v; //@ 赋值可能抛异常,但值是用户提供的,可放心让用户处理
            }
        }

        void remove_mapping(const K& k)
        { //@ std::list::erase不会抛异常,因此异常安全
            std::unique_lock<std::shared_mutex> l(m); //@ 写,单独占用
            auto it = std::find_if(data.begin(), data.end(), [&](auto& x) { return x.first == k; });
            if (it != data.end()) data.erase(it);
        }
    };

    std::vector<std::unique_ptr<Bucket>> buckets;
    Hash hasher;
    Bucket& get_bucket(const K& k) const
    { //@ 桶数固定因此可以无锁调用
        return *buckets[hasher(k) % buckets.size()];
    }
public:
    //@ 桶的默认数量为19(一般用x%桶数决定放置x的桶的索引,桶数为质数可以使得桶分布均匀)
    A(unsigned n = 19, const Hash& h = Hash{}) : buckets(n), hasher(h)
    {
        for (auto& x : buckets)
            x.reset(new Bucket);
    }
    A(const A&) = delete;
    A& operator=(const A&) = delete;
    V value_for(const K& k, const V& v = V{}) const
    {
        return get_bucket(k).value_for(k, v);
    }

    void add_or_update_mapping(const K& k, const V& v)
    {
        get_bucket(k).add_or_update_mapping(k, v);
    }

    void remove_mapping(const K& k)
    {
        get_bucket(k).remove_mapping(k);
    }
    //@ 为了方便使用,提供一个到std::map的映射
    std::map<K, V> get_map() const
    {
        std::vector<std::unique_lock<std::shared_mutex>> l;
        for (auto& x : buckets)
        {
            l.push_back(std::unique_lock<std::shared_mutex>(x->m));
        }
        std::map<K, V> res;
        for (auto& x : buckets)
        {
            for (auto& y : x->data)
                res.insert(y);
        }
        return res;
    }
};

使用锁实现thread-safe list

template<typename T>
class A {
    struct node {
        std::mutex m;
        std::shared_ptr<T> data;
        std::unique_ptr<node> next;
        node() : next() {}
        node(const T& x) : data(std::make_shared<T>(x)) {}
    };
    node head;
public:
    A() {}
    ~A() { remove_if([](const node&) { return true; }); }
    A(const A&) = delete;
    A& operator=(const A&) = delete;
    void push_front(const T& x)
    {
        std::unique_ptr<node> newNode(new node(x));
        std::lock_guard<std::mutex> l(head.m);
        newNode->next = std::move(head.next);
        head.next = std::move(newNode);
    }

    template<typename F>
    void for_each(F f)
    {
        node* cur = &head;
        std::unique_lock<std::mutex> l(head.m);
        while (node* const next = cur->next.get())
        {
            std::unique_lock<std::mutex> nextLock(next->m);
            l.unlock(); //@ 锁住了下一节点,因此可以释放上一节点的锁
            f(*next->data);
            cur = next; //@ 当前节点指向下一节点
            l = std::move(nextLock); //@ 转交下一节点锁的所有权,循环上述过程
        }
    }

    template<typename F>
    std::shared_ptr<T> find_first_if(F f)
    {
        node* cur = &head;
        std::unique_lock<std::mutex> l(head.m);
        while (node* const next = cur->next.get())
        {
            std::unique_lock<std::mutex> nextLock(next->m);
            l.unlock();
            if (f(*next->data))
                return next->data; //@ f返回true时则返回目标值,无需继续查找
            cur = next;
            l = std::move(nextLock);
        }
        return std::shared_ptr<T>();
    }

    template<typename F>
    void remove_if(F f)
    {
        node* cur = &head;
        std::unique_lock<std::mutex> l(head.m);
        while (node* const next = cur->next.get())
        {
            std::unique_lock<std::mutex> nextLock(next->m);
            if (f(*next->data))
            { //@ f为true时则移除下一节点
                std::unique_ptr<node> oldNext = std::move(cur->next);
                cur->next = std::move(next->next); //@ 下一节点设为下下节点
                nextLock.unlock();
            }
            else
            { //@ 否则继续转至下一节点
                l.unlock();
                cur = next;
                l = std::move(nextLock);
            }
        }
    }
};