Contents
无锁并发数据结构的设计¶
非阻塞数据结构¶
阻塞的算法和数据结构使用
mutex、条件变量、期值来同步数据,但非阻塞不等价于
lock-free,比如自旋锁:
class spinlock_mutex {
std::atomic_flag flag = ATOMIC_FLAG_INIT;
public:
void lock()
{
while (flag.test_and_set(std::memory_order_acquire));
}
void unlock()
{
flag.clear(std::memory_order_release);
}
};
spinlock_mutex m;
void f(int n)
{
for (int i = 0; i < 100; ++i)
{
m.lock();
std::cout << "Output from thread " << n << '\n';
m.unlock();
}
}
int main()
{
std::vector<std::thread> v;
for (int i = 0; i < 10; ++i)
v.emplace_back(f, i);
for (auto& x : v) x.join();
}
这里没有使用任何阻塞函数的调用,因此使用这个自旋锁的代码是非阻塞的,但并非
lock-free 。
非阻塞数据结构由松到严可分为三个等级:obstruction-free、lock-free、wait-free。
obstruction-free(无障碍):如果其他线程都暂停了,任何一个给定的线程都会在有限步数内完成操作。上例就是这种情况,但这种情况很少见,所以满足这个条件只能算一个失败的lock-free实现。lock-free(无锁):如果多线程在同一个数据结构上操作,其中一个将在有限步数内完成操作。满足lock-free必定满足obstruction-free。wait-free(无等待):如果多线程在同一个数据结构上操作,每个线程都会在有限步数内完成操作。满足wait-free必定满足lock-free,但wait-free很难实现,因为要保证有限步数内完成操作,就要保证操作一次通过,并且执行到某一步不能导致其他线程操作失败。
lock-free
数据结构必须允许多线程并发访问,但它们不能做相同操作,比如一个
lock-free 的 queue 允许一个线程 push、另一个线程
pop,但不允许两个线程同时 push。此外,如果一个访问
lock-free
数据结构的线程被中途挂起,其他线程必须能完成操作而不需要等待挂起的线程。
使用 lock-free
数据结构主要是为了最大化并发访问,不需要阻塞。第二个原因是鲁棒性,如果线程在持有锁时死掉就会导致数据结构被永久破坏,而对
lock-free
数据结构来说,除了死掉的线程里的数据,其他的数据都不会丢失。lock-free没有任何锁,所以一定不会出现死锁。
但 lock-free 可能造成更大开销,用于 lock-free
的原子操作比非原子操作慢得多,且 lock-free
数据结构中的原子操作一般比 lock-based
中的多,此外,硬件必须访问同一个原子变量以在线程间同步数据。无论
lock-free 还是
lock-based,性能方面的检查(最坏情况等待时间、平均等待时间、总体执行时间或其他方面)都是非常重要的。
lock-free thread-safe stack¶
最简单的 stack 实现方式是包含头节点指针的链表。push
的过程很简单,创建一个新节点,然后让新节点的next 指针指向当前
head,最后 head 设为新节点。
这里的 race condition 在于,如果两个线程同时
push,让各自的新节点的 next 指针指向当前
head,这样必然导致 head
最终设为二者之一的新节点,而另一个被丢弃。
解决方法是,在最后设置 head 时先进行判断,只有当前 head
与新节点的 next 相等,才将 head 设为新节点,如果不等则让
next 指向当前 head
并重新判断。而这个操作必须是原子的,因此就需要使用compare_exchange_weak,不需要使用
compare_exchange_strong,因为
compare_exchange_weak
在相等时可能替换失败,但替换失败也会返回
false,放在循环里带来的效果是一样的,而
compare_exchange_weak
在一些机器架构上可以产生比
compare_exchange_strong
更优化的代码。
template<typename T>
class A {
struct node {
T val;
node* next;
node(const T& x): val(x) {}
};
std::atomic<node*> head;
public:
void push(const T& x)
{
const auto newNode = new node(x);
newNode->next = head.load();
while (!head.compare_exchange_weak(newNode->next, newNode));
}
};
pop 的过程也很简单,先用一个指针存储当前 head,再将 head
设为 haed->next,最后返回存储的值并删除指针。
这里的 race condition 在于,如果两个线程同时
pop,一个已经删除了原来的
head,另一个线程读取head->next 时,head
已经是空悬指针。因此,这里绕开删除指针这一步,先考虑前几步的实现。
template<typename T>
class A {
public:
void pop(T& n) //@ 传引用获取结果
{
node* oldHead = head.load(); //@ 还需要考虑head为空指针的情况
while (!head.compare_exchange_weak(oldHead, oldHead->next));
n = oldHead->val;
}
};
这里还有两个问题要考虑:
- 一是链表为空时
head为空指针。 - 二是异常安全问题。如果直接返回值,弹出元素一定在返回之前,如果拷贝返回值时抛出异常就会导致元素丢失(移除成功但拷贝失败),因此这里传入一个引用来保存结果。但实际上传引用也不行,如果其他线程移除了节点,就无法解引用被删除的节点,当前线程就不能安全地拷贝数据
如果想安全地返回值,应该返回一个指向数据值的智能指针,如果没有返回值则可以通过返回 nullptr 来表示。
template<typename T>
class A {
struct node {
std::shared_ptr<T> val;
node* next;
node(const T& x) : val(std::make_shared<T>(x)) {}
};
std::atomic<node*> head;
public:
void push(const T& x)
{
const auto newNode = new node(x);
newNode->next = head.load();
while (!head.compare_exchange_weak(newNode->next, newNode));
}
std::shared_ptr<T> pop() //@ 还未考虑释放原来的头节点指针
{
node* oldHead = head.load();
while (oldHead && !head.compare_exchange_weak(oldHead, oldHead->next));
return oldHead ? oldHead->val : std::shared_ptr<T>();
}
};
处理内存泄漏¶
释放被移除的 head
的难点在于,一个线程在释放内存时,无法得知其他线程是否持有要释放的指针。当没有其他线程调用
pop 时,就可以任意释放了。这意味着可以用一个计数器来记录调用 pop
的线程数,当计数为1时就可以安全释放了,否则就把要释放的节点添加到一个待删除节点的列表
template<typename T>
class A {
struct node {
std::shared_ptr<T> val;
node* next;
node(const T& x) : val(std::make_shared<T>(x)) {}
};
std::atomic<node*> head;
std::atomic<unsigned> cnt; //@ 调用pop的线程数
std::atomic<node*> toDel; //@ 待删除节点的列表的头节点
public:
void push(const T& x)
{
const auto newNode = new node(x);
newNode->next = head.load();
while (!head.compare_exchange_weak(newNode->next, newNode));
}
std::shared_ptr<T> pop()
{
++cnt; //@ 调用pop的线程数加一,表示oldHead正被持有,保证可以被解引用
node* oldHead = head.load();
while (oldHead && !head.compare_exchange_weak(oldHead, oldHead->next));
std::shared_ptr<T> res;
if (oldHead) res.swap(oldHead->val); //@ oldHead一定可以解引用,oldHead->val设为nullptr
try_reclaim(oldHead); //@ 如果计数器为1则释放oldHead,否则添加到待删除列表中
return res; //@ res保存了oldHead->val
}
private:
static void deleteNodes(node* n) //@ 释放n及之后的所有节点
{
while (n)
{
node* tmp = n->next;
delete n;
n = tmp;
}
}
void try_reclaim(node* oldHead)
{
if (cnt == 1) //@ 调用pop的线程数为1则可以进行释放
{
//@ exchange返回toDel值,即待删除列表的头节点,再将toDel设为nullptr
node* n = toDel.exchange(nullptr); //@ 获取待删除列表的头节点
if (--cnt == 0)
{ //@ 没有其他线程,则释放待删除列表中所有节点
deleteNodes(n);
}
else if (n)
{ //@ 如果多于一个线程则继续保存到待删除列表
addToDel(n);
}
delete oldHead; //@ 删除传入的节点
}
else //@ 调用pop的线程数超过1,添加当前节点到待删除列表
{
addToDel(oldHead, oldHead);
--cnt;
}
}
void addToDel(node* n) //@ 把n及之后的节点置于待删除列表之前
{
node* last = n;
while (const auto tmp = last->next) last = tmp; //@ last指向尾部
addToDel(n, last); //@ 添加从n至last的所有节点到待删除列表
}
void addToDel(node* first, node* last)
{
last->next = toDel; //@ 链接到已有的待删除列表之前
//@ 确保最后last->next为toDel,再将toDel设为first,first即新的头节点
while (!toDel.compare_exchange_weak(last->next, first));
}
};
但如果要释放所有节点,必须有一个时刻计数器的值为0。在高负载的情况下,往往不会存在这样的时刻,从而导致待删除节点的列表无限增长。
### Hazard Pointer
另一个释放的思路是,在线程访问节点时,设置一个 hazard pointer,其中保存了线程 ID 和该节点。所有线程的hazard pointer 存储在一个全局数组中,释放时检查该数组,如果其他线程的 hazard pointer 都不包含此节点,则可以直接释放,否则将节点添加到待删除列表中。
std::shared_ptr<T> pop()
{
std::atomic<void*>& hp = get_HazardPointer_for_current_thread();
node* oldHead = head.load();
do { //@ 外循环确保oldHead为最新的head,循环结束后将head设为head->next
node* tmp;
do { //@ 循环至hp设为当前最新的head
tmp = oldHead;
hp.store(oldHead);
oldHead = head.load(); //@ 获取最新的head
} while (oldHead != tmp);
} while (oldHead && !head.compare_exchange_strong(oldHead, oldHead->next));
hp.store(nullptr); //@ 清空hp
std::shared_ptr<T> res;
if (oldHead)
{
res.swap(oldHead->val);
if (outstanding_hazard_pointers_for(oldHead))
{ //@ 如果hp数组中仍存在内部指针与head相等的hp
reclaim_later(oldHead); //@ 将head添加到待删除节点的列表中
}
else
{ //@ 否则释放该节点
delete oldHead;
}
delete_nodes_with_no_hazards(); //@ 释放待删除节点列表中可删除的节点
}
return res;
}
hazard pointer 的实现:
constexpr int maxSize = 100;
struct HazardPointer {
std::atomic<std::thread::id> id;
std::atomic<void*> p;
};
HazardPointer a[maxSize];
class HP {
HazardPointer* hp;
public:
HP(const HP&) = delete;
HP operator=(const HP&) = delete;
HP() : hp(nullptr)
{
for (int i = 0; i < maxSize; ++i)
{
std::thread::id oldId;
if (a[i].id.compare_exchange_strong(oldId, std::this_thread::get_id()))
{ //@ a[i].id == oldId说明a[i]未被设置过,将其设为当前线程ID
hp = &a[i]; //@ 将a[i]分配给该线程的hp
break;
}
}
if (!hp)
{ //@ 遍历数组都已经被设置过,说明没有新的位置可以分配给当前线程
throw std::runtime_error("No hazard pointers available");
}
}
std::atomic<void*>& getPointer()
{
return hp->p;
}
~HP()
{
hp->p.store(nullptr);
hp->id.store(std::thread::id());
}
};
std::atomic<void*>& get_HazardPointer_for_current_thread()
{
thread_local static HP hp; //@ 每个线程都有各自的hazard pointer
return hp.getPointer();
}
bool outstanding_hazard_pointers_for(void* x)
{
for (int i = 0; i < maxSize; ++i)
{
if (a[i].p.load() == x)
return true;
}
return false;
}
待删除节点的列表的实现:
template<typename T>
void f(void* p) //@ 删除器
{
delete static_cast<T*>(p);
}
struct DataToReclaim {
void* data;
std::function<void(void*)> deleter;
DataToReclaim* next;
template<typename T>
DataToReclaim(T* p) : data(p), deleter(&f<T>), next(nullptr) {}
~DataToReclaim()
{
deleter(data);
}
};
std::atomic<DataToReclaim*> toDel; //@ 待删除节点的列表的头节点
void addToDel(DataToReclaim* n)
{
n->next = toDel.load();
while (!toDel.compare_exchange_weak(n->next, n));
}
template<typename T> //@ 这里用模板来实现
void reclaim_later(T* data) //@ 因为DataToReclaim的构造函数是模板
{
addToDel(new DataToReclaim(data));
}
void delete_nodes_with_no_hazards() //@ 释放待删除节点列表中可删除的节点
{
DataToReclaim* cur = toDel.exchange(nullptr);
while (cur)
{
DataToReclaim* const tmp = cur->next;
if (!outstanding_hazard_pointers_for(cur->data))
{
delete cur;
}
else
{
addToDel(cur);
}
cur = tmp;
}
}
hazard pointer
实现简单,并达到了安全释放的目的,但增加了很多开销,因为每次调用 pop
时,删除当前节点前后都要遍历数组进行检查,并且检查时需要原子访问 hazard
pointer 的内部指针,原子操作的开销也较高。
hazard pointer 包含在 IBM 提交的专利申请中,实际上无锁内存回收技术领域十分活跃,大公司都会申请自己的专利。不过在 GPL 协议下仍然允许免费使用这个专利。
引用计数¶
hazard pointer 将使用中的节点存储在一个链表中解决了问题,而引用计数存储的是访问每个节点的线程数量。首先会想到使用自带引用计数的 std::shared_ptr,并且它的操作也是原子的,但它不保证 lock-free,可以用如下方法检查所在平台的 std::shared_ptr 是否 lock-free。
std::shared_ptr<int> p(new int(42));
assert(std::atomic_is_lock_free(&p)); //@ 如果std::shared_ptr在该平台lock-free
如果是,则可以用它来实现 lock-free stack:
template<typename T>
class A {
struct node {
std::shared_ptr<T> val;
std::shared_ptr<node> next;
node(const T& x) : val(std::make_shared<T>(x)) {}
};
std::shared_ptr<node> head;
public:
void push(const T& x)
{
const auto newNode = std::make_shared<node>(x);
newNode->next = std::atomic_load(&head);
while (!std::atomic_compare_exchange_weak(&head, &newNode->next, newNode));
}
std::shared_ptr<T> pop()
{
std::shared_ptr<node> oldHead = std::atomic_load(&head);
while (oldHead && !std::atomic_compare_exchange_weak(&head, &oldHead, oldHead->next));
if (oldHead)
{
std::atomic_store(&oldHead->next, std::shared_ptr<node>());
return oldHead->val;
}
return std::shared_ptr<T>();
}
~A()
{
while (pop());
}
};
实现上很难为 std::shared_ptr 提供原子操作成员函数,C++20 将支持 std::atomic 并对 std::shared_ptr 弃用上面的原子操作。
并发TS提供了 std::experimental::atomic_shared_ptr,它与 std::atomic 等价,它也不保证 lock-free。下面是对上例的重写。
class A {
struct node {
std::shared_ptr<T> val;
std::experimental::atomic_shared_ptr<node> next;
node(const T& x): val(std::make_shared<T>(x)) {}
};
std::experimental::atomic_shared_ptr<node> head;
public:
void push(const T& x)
{
const auto newNode = std::make_shared<node>(x);
newNode->next = head.load();
while (!head.compare_exchange_weak(newNode->next, newNode));
}
std::shared_ptr<T> pop()
{
std::shared_ptr<node> oldHead = head.load();
while (oldHead && !head.atomic_compare_exchange_weak(oldHead, oldHead->next.load()));
if (oldHead)
{
oldHead->next = std::shared_ptr<node>();
return oldHead->val;
}
return std::shared_ptr<T>();
}
~A()
{
while (pop());
}
};
如果 std::shared_ptr 在平台上并非 lock-free,就不能用来管理 lock-free 数据结构的引用计数了,这时只能手动管理。
一种方法是,为每个节点使用内部和外部两个引用计数,两者之和就是节点的引用计数。每当有线程读取节点时,外部计数递增,读取结束时内部计数递减。当没有线程再读取节点时,就不再需要外部计数,每当外部计数递减或被丢失,内部计数就递增。当内部计数为零时,就表示节点可以安全删除了。
template<typename T>
class A {
struct node;
struct cntPtr {
int exCnt; //@ 外部计数
node* p;
};
struct node {
std::shared_ptr<T> val;
std::atomic<int> inCnt; //@ 内部计数
cntPtr next;
node(const T& x) : val(std::make_shared<T>(x)), inCnt(0) {}
};
std::atomic<cntPtr> head;
void increaseHeadCount(cntPtr& oldCnt)
{
cntPtr newCnt;
do {
newCnt = oldCnt;
++newCnt.exCnt; //@ 访问head时递增外部计数,表示该节点正被使用
} while (!head.compare_exchange_strong(oldCnt, newCnt));
oldCnt.exCnt = newCnt.exCnt;
}
public:
void push(const T& x)
{
cntPtr newNode;
newNode.p = new node(x);
newNode.exCnt = 1;
newNode.p->next = head.load();
while (!head.compare_exchange_weak(newNode.p->next, newNode));
}
std::shared_ptr<T> pop()
{
cntPtr oldHead = head.load();
for (;;)
{
increaseHeadCount(oldHead); //@ 外部计数递增表示该节点正被使用
node* const p = oldHead.p; //@ 因此可以安全地访问
if (!p) return std::shared_ptr<T>();
if (head.compare_exchange_strong(oldHead, p->next))
{
std::shared_ptr<T> res;
res.swap(p->val);
//@ 再将外部计数减2加到内部计数,减2是因为,
//@ 节点被删除减1,该线程无法再次访问此节点再减1
const int increaseCount = oldHead.exCnt - 2;
if (p->inCnt.fetch_add(increaseCount) == -increaseCount)
{ //@ 如果内部计数加上increaseCount为0(相加前为-increaseCount)
delete p;
}
return res;
}
else if (p->inCnt.fetch_sub(1) == 1)
{
delete p;
}
}
}
~A()
{
while (pop());
}
};
内存序¶
以上使用的都是默认内存序
std::memory_order_seq_cst,但这是开销最大的内存序,下面根据操作间的依赖关系选择最小内存序。
template<typename T>
class A {
struct node;
struct cntPtr {
int exCnt;
node* p;
};
struct node {
std::shared_ptr<T> val;
std::atomic<int> inCnt;
cntPtr next;
node(const T& x) : val(std::make_shared<T>(x)), inCnt(0) {}
};
std::atomic<cntPtr> head;
void increaseHeadCount(cntPtr& oldCnt)
{
cntPtr newCnt;
do { //@ 比较失败不改变当前值,并可以继续循环,因此可以选择relaxed
newCnt = oldCnt;
++newCnt.exCnt;
} while (!head.compare_exchange_strong(oldCnt, newCnt,
std::memory_order_acquire, std::memory_order_relaxed));
oldCnt.exCnt = newCnt.exCnt;
}
public:
void push(const T& x)
{
cntPtr newNode;
newNode.p = new node(x);
newNode.exCnt = 1;
//@ 下面比较中release保证之前的语句都先执行,因此load可以使用relaxed
newNode.p->next = head.load(std::memory_order_relaxed);
//@ 比较失败不改变当前值,并可以继续循环,因此可以选择relaxed
while (!head.compare_exchange_weak(newNode.p->next, newNode,
std::memory_order_release, std::memory_order_relaxed));
}
std::shared_ptr<T> pop()
{
cntPtr oldHead = head.load(std::memory_order_relaxed);
for (;;)
{
increaseHeadCount(oldHead); //@ acquire
node* const p = oldHead.p;
if (!p) return std::shared_ptr<T>();
if (head.compare_exchange_strong(oldHead, p->next, std::memory_order_relaxed))
{
std::shared_ptr<T> res;
res.swap(p->val);
const int increaseCount = oldHead.exCnt - 2;
//@ swap要先于delete,因此使用release
if (p->inCnt.fetch_add(increaseCount, std::memory_order_release)
== -increaseCount)
{
delete p;
}
return res;
}
else if (p->inCnt.fetch_add(-1, std::memory_order_relaxed) == 1)
{
p->inCnt.load(std::memory_order_acquire); //@ 只是用acquire来同步
//@ acquire保证delete在之后执行
delete p;
}
}
}
~A()
{
while (pop());
}
};
lock-free thread-safe queue¶
queue 与 stack 不同的是,push 和 pop
访问的是数据结构中不同的部分,因此需要确保对其中一方的修改对另一方来说是可见的,即允许修改被另一方访问。以
lock-based queue 为基础,使用原子的头尾指针,实现如下单生产者单消费者
lock-free queue:
template<typename T>
class A {
struct node {
std::shared_ptr<T> val;
node* next;
node(): next(nullptr) {}
};
std::atomic<node*> head;
std::atomic<node*> tail;
node* popHead()
{
node* const oldHead = head.load();
if (oldHead == tail.load()) return nullptr;
head.store(oldHead->next);
return oldHead;
}
public:
A(): head(new node), tail(head.load()) {}
A(const A&) = delete;
A& operator=(const A&) = delete;
~A()
{
while (node* const oldHead = head.load())
{
head.store(oldHead->next);
delete oldHead;
}
}
void push(T x)
{
std::shared_ptr<T> newVal(std::make_shared<T>(x));
node* p = new node;
node* const oldTail = tail.load();
oldTail->val.swap(newVal);
oldTail->next = p;
tail.store(p);
}
std::shared_ptr<T> pop()
{
node* oldHead = popHead();
if (!oldHead) return std::shared_ptr<T>();
const std::shared_ptr<T> res(oldHead->val);
delete oldHead;
return res;
}
};
这个实现在只有一个线程 push,只有一个线程 pop,且 push
先于 pop 时,可以完美工作。但如果两个线程同时
push,就会读取同一个尾节点,导致只有一个 push
的结果被保留,这是明显的 race condition。同理,两个线程同时 pop Head
也只会弹出一个节点。
除了确保同时只能有一个线程 pop,还要确保其他线程在访问 head
时能安全地解引用。这个问题类似于 lock-free stack 的
pop,需要确保安全地释放,因此之前 lock-free stack
的方案都可以套用在这里。
假设 pop 的问题已经解决了,现在考虑 push,push
的问题就是多个线程获取的是同一个 tail 节点。第一种解决方法是添加一个
dummy 节点,这样当前的 tail 只需要更新 next 指针,如果一个线程将 next 由
nullptr
设为新节点就说明添加成功,否则重新读取 tail
并添加。这要求对pop也做一些微小的修改,以便丢弃带空数据指针的节点并再次循环。这种方法的缺点是,每次
pop 通常要移除两个节点,并且内存分配是原来的两倍。
第二种方法是使用 std::atomic<std::shared_ptr> 作为节点数据,通过
compare_exchange_strong
对其设置。如果
std::shared_ptr
为lock-free则一切都解决了,如果不是就需要其他方案,比如让 pop
返回std::unique_ptr(对象的唯一引用),并使用原始指针原子类型作为节点数据。
下面对 push 进行修改,为了避免解引用时指针在 pop
中被释放,使用引用计数的方案,存在外部计数时就不能释放节点。
void push(T x)
{
std::unique_ptr<T> newData(new T(x));
cntPtr newNext;
newNext.p = new node;
newNext.exCnt = 1;
for (;;)
{
node* const oldTail = tail.load(); //@ 读取尾节点
T* oldData = nullptr;
//@ 解引用尾节点(oldTail->val)
if (oldTail->val.compare_exchange_strong(oldData, newData.get()))
{
oldTail->next = newNext;
tail.store(newNext.p); //@ 更新尾节点
newData.release();
break;
}
}
}
使用引用计数的 push 的完整实现:
template<typename T>
class A {
struct node;
struct cntPtr {
int exCnt;
node* p;
};
std::atomic<cntPtr> head;
std::atomic<cntPtr> tail;
struct nodeCnt {
unsigned inCnt: 30;
unsigned exCounter: 2;
};
struct node {
std::atomic<T*> val;
std::atomic<nodeCnt> cnt;
cntPtr next;
node()
{
nodeCnt newCnt;
newCnt.inCnt = 0;
newCnt.exCounter = 2;
cnt.store(newCnt);
next.p = nullptr;
next.exCnt = 0;
}
};
public:
void push(T x)
{
std::unique_ptr<T> newData(new T(x));
cntPtr newCnt;
newCnt.p = new node;
newCnt.exCnt = 1;
cntPtr oldTail = tail.load();
for (;;)
{
increaseExCnt(tail, oldTail);
T* oldData = nullptr;
if (oldTail.p->val.compare_exchange_strong(oldData, newData.get()))
{
oldTail.p->next = newCnt;
oldTail = tail.exchange(newCnt);
freeExCnt(oldTail);
newData.release();
break;
}
oldTail.p->releaseRef();
}
}
};
使用引用计数的 pop:
template<typename T>
class A {
struct node {
void releaseRef()
{
nodeCnt oldCnt = cnt.load(std::memory_order_relaxed);
nodeCnt newCnt;
do {
newCnt = oldCnt;
--newCnt.inCnt;
} while (!cnt.compare_exchange_strong(oldCnt, newCnt,
std::memory_order_acquire, std::memory_order_relaxed));
if (!newCnt.inCnt && !newCnt.exCounter)
{
delete this;
}
}
};
public:
std::unique_ptr<T> pop()
{
cntPtr oldHead = head.load(std::memory_order_relaxed);
for (;;)
{
increaseExCnt(head, oldHead);
node* const p = oldHead.p;
if (p == tail.load().p)
{
p->releaseRef();
return std::unique_ptr<T>();
}
if (head.compare_exchange_strong(oldHead, p->next))
{
T* const res = p->val.exchange(nullptr);
freeExCnt(oldHead);
return std::unique_ptr<T>(res);
}
p->releaseRef();
}
}
private:
static void increaseExCnt(std::atomic<cntPtr>& counter, cntPtr& oldCounter)
{
cntPtr newCounter;
do {
newCounter = oldCounter;
++newCounter.exCnt;
} while (!counter.compare_exchange_strong(oldCounter, newCounter,
std::memory_order_acquire,std::memory_order_relaxed));
oldCounter.exCnt = newCounter.exCnt;
}
static void freeExCnt(cntPtr &oldNodePtr)
{
node* const p = oldNodePtr.p;
const int increaseCount = oldNodePtr.exCnt - 2;
nodeCnt oldCounter = p->cnt.load(std::memory_order_relaxed);
nodeCnt newCounter;
do {
newCounter = oldCounter;
--newCounter.exCounter;
newCounter.inCnt += increaseCount;
} while (!p->cnt.compare_exchange_strong(oldCounter, newCounter,
std::memory_order_acquire,std::memory_order_relaxed));
if (!newCounter.inCnt && !newCounter.exCounter)
{
delete p;
}
}
};
现在没有竞争,但还有一个性能问题,一旦一个线程执行 push 的
compare_exchange_strong
成功,其他线程看到的都是新值而非
nullptr,这会导致其他线程
compare_exchange_strong
失败并重新循环,造成的 busy wait就相当于锁,因此代码也就不算 lock-free
为了解决此问题,必须让等待的线程有所进展,即使 push
线程被挂起。一个方法是替代被挂起线程完成其工作。