风险指针的巧妙运用

简介

术语“风险指针”是指Maged Michael发明的一种技法, 后来被IBM申请为专利。
前文我们设计了无锁并发栈的结构,对于pop操作回收节点采用的是延时删除的策略,即将要删除的节点放入待删除列表中。
但是待删列表中的节点可能永远不会被回收,因为每次多个线程pop就不会触发回收待删列表的操作。上一节我们说可以通过执行pop的最后一个线程执行回收,那为了实现这个目的,我们就要换一种思路。就是我们将要删除的节点做特殊处理,如果有线程使用它,就将他标记为正在使用,那么这个节点的指针就是风险指针,也就是不能被其他线程删除。

改进pop

我们假设有一块全局区域存储的是一个风险数组,数组里放的就是风险指针。
我们提供一个函数去设置数组中某个节点为风险指针,没被设置为风险指针的节点就是可用的。
然后我们再提供一个函数去查找数组中的节点,返回一个可用的节点。
假设查找的函数叫做get_hazard_pointer_for_current_thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
std::shared_ptr<T> pop()
{
// 1
std::atomic<void*>& hp=get_hazard_pointer_for_current_thread();
node* old_head=head.load(); // ⇽--- 2
node* temp;
do
{
temp=old_head;
hp.store(old_head); // ⇽--- 3
old_head=head.load();
} while(old_head!=temp); // ⇽--- 4
// ...
}

1 处获取数组中可用节点指针,用hp存储节点指针的引用。

2 处加载当前头节点保存到old节点里。

3 处将old_head的值赋值给hp。

4 while循环处的作用就是防止多个线程访问pop函数,某一个线程B将head修改,那说明他已经将它的临时变量old节点放入风险数组中,而本线程A的old节点和线程B的old节点指向的是同一个旧有的head,所以线程A就没必要将这个old节点放入风险数组了,需要再次循环获取新的head加载为old节点,再放入风险数组。

我们实现一个完整意义的pop操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
std::shared_ptr<T> pop()
{
//1 从风险列表中获取一个节点给当前线程
std::atomic<void*>& hp=get_hazard_pointer_for_current_thread();
node* old_head=head.load();
do
{
node* temp;
do
{
temp=old_head;
hp.store(old_head);
old_head=head.load();
}//2 如果old_head和temp不等说明head被其他线程更新了,需重试
while(old_head!=temp);
}//3 将当前head更新为old_head->next,如不满足则重试
while(old_head&&
!head.compare_exchange_strong(old_head,old_head->next));
// 4一旦更新了head指针,便将风险指针清零
hp.store(nullptr);
std::shared_ptr<T> res;
if(old_head)
{
res.swap(old_head->data);
//5 删除旧有的头节点之前,先核查它是否正被风险指针所指涉
if(outstanding_hazard_pointers_for(old_head))
{
//6 延迟删除
reclaim_later(old_head);
}
else
{
//7 删除头部节点
delete old_head;
}
//8 删除没有风险的节点
delete_nodes_with_no_hazards();
}
return res;
}

1 我们观察1处代码从全局的风险数组中jiu获取一个可用的节点作为风险节点。这样其他线程在回收节点的时候会从这个数组中看到这个风险节点,进而不会删除该节点。

2 处代码做循环比较是为了hp存储的节点为当前线程操作的head,如果其它线程更新了head,那么当前线程就要进行重试。以保证每个线程的hp存储的都是自己看到的最新的head。

3 处将head和old_head做比较,如果相等则更新head为old_head->next的值。如果不等则再次重复循环,因为有可能有多个线程都满足2处的条件,多个线程的hp和old_head指向相同,所以要重试,保证多线程情况下head的移动唯一。

4 一旦更新了head指针,就可以将这个风险指针清零了,因为其他线程pop操作的head已经不是我们hp存储的old_head了。所以此种情况下是线程安全的。

5 删除旧节点之前,先看它是否被风险指针所指涉。

6 如果要删除的节点被风险指针指涉,则延迟删除,放入待删列表。都则直接删除该节点即可。(7处)

8 每个线程pop后,都要查一下待删列表,将其中没有风险的节点删除。

接下来我们实现从全局连表中返回一个可用的节点

1
2
3
4
5
std::atomic<void*>& get_hazard_pointer_for_current_thread() {
//每个线程都具有自己的风险指针 线程本地变量
thread_local static hp_owner hazzard;
return hazzard.get_pointer();
}

我们通过一个线程本地变量hazzard存储当前线程正在使用的节点,这个节点被称作风险节点。其他线程不能删除。

接下来我们实现hazard_pointer类,管理风险指针和线程id

1
2
3
4
struct hazard_pointer {
std::atomic<std::thread::id> id;
std::atomic<void*> pointer;
};

id为正在使用该风险指针的id,pointer为指针类型,存储的节点数据地址。
当一个线程从风险数组中查找某个闲置节点作为风险节点,则需要将pointer指向节点的数据,并且将id设置为当前的线程id。

我们定义一个全局的风险节点数组,用来存储风险节点。

1
hazard_pointer hazard_pointers[max_hazard_pointers];

然后我们用hp_owner类管理这个风险指针

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class hp_owner {
public:
hp_owner(hp_owner const&) = delete;
hp_owner operator=(hp_owner const&) = delete;
hp_owner():hp(nullptr){
for (unsigned i = 0; i < max_hazard_pointers; ++i) {
std::thread::id old_id;
if (hazard_pointers[i].id.compare_exchange_strong(old_id, std::this_thread::get_id())) {
hp = &hazard_pointers[i];
break;
}
}

if (!hp) {
throw std::runtime_error("No hazard pointers available");
}
}

~hp_owner() {
hp->pointer.store(nullptr);
hp->id.store(std::thread::id());
}
private:
hazard_pointer* hp;
};

每个线程每次调用get_hazard_pointer_for_current_thread只会在第一次的时候构造hp_owner类型的hazzard,之后该线程再次调用该函数不会构造hp_owner,因为是线程本地变量。

当一个线程析构的时候会释放其本地变量hazzard,进而执行hp_owner析构函数,从而恢复初值。

接下来我们要实现hp_owner的get_pointer函数。

1
2
3
std::atomic<void*>& get_pointer() {
return hp->pointer;
}

hp_owner 的get_pointer函数返回其成员pointer指向的地址

接下来我们实现判断该节点是否被风险指针所指涉的函数

1
2
3
4
5
6
7
8
9
10
11
bool outstanding_hazard_pointers_for(void* p)
{
for (unsigned i = 0; i < max_hazard_pointers; ++i)
{
if (hazard_pointers[i].pointer.load() == p)
{
return true;
}
}
return false;
}

如果当前节点被风险指针所指涉则将该节点放入待删队列延迟删除

1
2
3
void reclaim_later(node* old_head) {
add_to_reclaim_list(new data_to_reclaim(old_head));
}

将节点放入待删列表,我们封装了一个data_to_reclaim类型的节点放入待删列表。

我们定义待删节点的结构体

1
2
3
4
5
6
7
8
9
10
//待删节点
struct data_to_reclaim {
node* data;
std::function<void(node*)> deleter;
data_to_reclaim* next;
data_to_reclaim(node * p):data(p), next(nullptr){}
~data_to_reclaim() {
delete data;
}
};

然后在无锁栈中定义一个节点表示待删列表的首节点,因为栈是被多个线程操作的,待删列表也会被多个线程访问,那么我们需要用原子变量表示这个首节点

1
std::atomic<data_to_reclaim*>  nodes_to_reclaim;

我们实现将节点放入待删列表的逻辑

1
2
3
4
void add_to_reclaim_list(data_to_reclaim* reclaim_node) {
reclaim_node->next = nodes_to_reclaim.load();
while (!nodes_to_reclaim.compare_exchange_weak(reclaim_node->next, reclaim_node));
}

因为可能有多个线程同时将节点放入待删列表,所以此处做重试。
接下来实现从待删列表中删除无风险的节点

1
2
3
4
5
6
7
8
9
10
11
12
13
void delete_nodes_with_no_hazards() {
data_to_reclaim* current = nodes_to_reclaim.exchange(nullptr);
while (current) {
data_to_reclaim* const next = current->next;
if (!outstanding_hazard_pointers_for(current->data)) {
delete current;
}
else {
add_to_reclaim_list(current);
}
current = next;
}
}

测试

我们依然用之前的方式测试,启动三个线程t1用来向栈中放入数据(总计push20000个), t2和t3用来从栈中pop数据。用一个set记录删除的值,最后判断总共删除的数量是否为2000.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
void TestHazardPointer() {
hazard_pointer_stack<int> hazard_stack;
std::set<int> rmv_set;
std::mutex set_mtx;

std::thread t1([&]() {
for (int i = 0; i < 20000; i++) {
hazard_stack.push(i);
std::cout << "push data " << i << " success!" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
});

std::thread t2([&]() {
for (int i = 0; i < 10000;) {
auto head = hazard_stack.pop();
if (!head) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
std::lock_guard<std::mutex> lock(set_mtx);
rmv_set.insert(*head);
std::cout << "pop data " << *head << " success!" << std::endl;
i++;
}
});

std::thread t3([&]() {
for (int i = 0; i < 10000;) {
auto head = hazard_stack.pop();
if (!head) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
std::lock_guard<std::mutex> lock(set_mtx);
rmv_set.insert(*head);
std::cout << "pop data " << *head << " success!" << std::endl;
i++;
}
});

t1.join();
t2.join();
t3.join();

assert(rmv_set.size() == 20000);
}

优劣分析

风险指针的机制能保证要删除的节点在合理的时机回收,但是也引发了一些性能问题,比如为了删除某个节点要遍历风险数组判断该节点是否被风险指针所指涉。其次我们对于要删除的节点需要从风险数组中选择一个合适的节点记录其地址,所以也需要便利。
C++ 并发编程一书中提出了用空间换取时间和性能的办法,就是开辟2*N个大小的风险数组,只有当使用的节点达到N个时我们才依次判断N个节点是否被风险指针所指涉,这样我i们减少了判断回收的次数。但同样增加内存的开销。
另外作者也提及了风险指针为IBM的技术专利,即使我们懂得这种方法也不见得有权利使用,为后文提及了引用计数做了铺垫。

源码链接:

https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day17-LockFreeStack

视频链接:

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290