恋恋风辰的个人博客


  • Home

  • Archives

  • Categories

  • Tags

  • Search

无锁并发的应用(无锁队列)

Posted on 2023-11-02 | In C++

简介

前文介绍了如何通过内存顺序实现内存模型,本文基于前文的基础,利用内存顺序和内存模型的知识,带着大家探索无锁并发的应用,主要是通过无锁队列的实现来让大家熟悉无锁并发的实现方式。

环形队列

我们要实现无锁并发,经常会用到一种结构无锁队列,而无锁队列和我们经常使用的队列颇有不同,它采用的是环状的队列结构,为什么成环呢?主要有两个好处,一个是成环的队列大小是固定的,另外一个我们通过移动头和尾就能实现数据的插入和取出。

我们看下图是一个环形队列的基本结构

https://cdn.llfc.club/4a6ee05475ca071cc608c9eb35920af.png

图1表示队列为空的时候,头节点和尾节点交会在一起,指向同一个扇区。

图2表示当我们你插入一个数字1后,队列大小为1,此时tail指针移动到下一个扇区,head指向头部,1被存储在头部了。

图3表示当我们将数字1出队后,head指针向后移动一个扇区,此时head和tail指向同一个扇区,表示队列又为空了。那有人会问队列中数字1为什么不清空呢?其实不用清空,因为当我们插入新数据时就可以覆盖掉1这个无效的数据。

比如我们继续3图,连续插入几个数字,将队列填满。

https://cdn.llfc.club/1698926107471.jpg

图4说明的就是当我们连续插入了几个数字,插入数据9的时候将原来1的数据覆盖了,所以环形队列删除数据的时候我们不用让数据出队,只要移动head指针即可。

另外我们从图4也能看出,此时tail指向的位置正好是head的前一个位置,这种情况表示队列满了。

用锁实现环形队列

我们可以用锁实现上述环形队列,在push和pop时分别加锁,并通过head和tail计算队列是否为满或者空。

代码比较简单,可以看看下面的写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#include <iostream>
#include <mutex>
#include <memory>

template<typename T, size_t Cap>
class CircularQueLk :private std::allocator<T> {
public:
CircularQueLk() :_max_size(Cap + 1),_data(std::allocator<T>::allocate(_max_size)), _head(0), _tail(0) {}
CircularQueLk(const CircularQueLk&) = delete;
CircularQueLk& operator = (const CircularQueLk&) volatile = delete;
CircularQueLk& operator = (const CircularQueLk&) = delete;

~CircularQueLk() {
//循环销毁
std::lock_guard<std::mutex> lock(_mtx);
//调用内部元素的析构函数
while (_head != _tail) {
std::allocator<T>::destroy(_data + _head);
_head = (_head+1)% _max_size;
}
//调用回收操作
std::allocator<T>::deallocate(_data, _max_size);
}

//先实现一个可变参数列表版本的插入函数最为基准函数
template <typename ...Args>
bool emplace(Args && ... args) {
std::lock_guard<std::mutex> lock(_mtx);
//判断队列是否满了
if ((_tail + 1) % _max_size == _head) {
std::cout << "circular que full ! " << std::endl;
return false;
}
//在尾部位置构造一个T类型的对象,构造参数为args...
std::allocator<T>::construct(_data + _tail, std::forward<Args>(args)...);
//更新尾部元素位置
_tail = (_tail + 1) % _max_size;
return true;
}

//push 实现两个版本,一个接受左值引用,一个接受右值引用

//接受左值引用版本
bool push(const T& val) {
std::cout << "called push const T& version" << std::endl;
return emplace(val);
}

//接受右值引用版本,当然也可以接受左值引用,T&&为万能引用
// 但是因为我们实现了const T&
bool push(T&& val) {
std::cout << "called push T&& version" << std::endl;
return emplace(std::move(val));
}

//出队函数
bool pop(T& val) {
std::lock_guard<std::mutex> lock(_mtx);
//判断头部和尾部指针是否重合,如果重合则队列为空
if (_head == _tail) {
std::cout << "circular que empty ! " << std::endl;
return false;
}
//取出头部指针指向的数据
val = std::move(_data[_head]);
//更新头部指针
_head = (_head + 1) % _max_size;
return true;
}
private:
size_t _max_size;
T* _data;
std::mutex _mtx;
size_t _head = 0;
size_t _tail = 0;
};

测试也比较简单,我们写一个函数,初始化队列大小为5,测试队列push满的情况和pop直到为空的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void TestCircularQue() {
//最大容量为10
CircularQueLk<MyClass, 5> cq_lk;
MyClass mc1(1);
MyClass mc2(2);
cq_lk.push(mc1);
cq_lk.push(std::move(mc2));
for (int i = 3; i <= 5; i++) {
MyClass mc(i);
auto res = cq_lk.push(mc);
if (res == false) {
break;
}
}

cq_lk.push(mc2);

for (int i = 0; i < 5; i++) {
MyClass mc1;
auto res = cq_lk.pop(mc1);
if (!res) {
break;
}
std::cout << "pop success, " << mc1 << std::endl;
}

auto res = cq_lk.pop(mc1);
}

结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
called push const T& version
called push T&& version
called push const T& version
called push const T& version
called push const T& version
called push const T& version
circular que full !

pop success, MyClass Data is 1
pop success, MyClass Data is 2
pop success, MyClass Data is 3
pop success, MyClass Data is 4
pop success, MyClass Data is 5
circular que empty !

无锁队列

那如果我们用原子变量而不是用锁实现环形队列,那就是无锁并发的队列了。还记得我们之前提到的原子变量的读改写操作吗?

1
2
bool std::atomic<T>::compare_exchange_weak(T &expected, T desired);
bool std::atomic<T>::compare_exchange_strong(T &expected, T desired);

compare_exchange_strong会比较原子变量atomic<T>的值和expected的值是否相等,如果相等则执行交换操作,将atomic<T>的值换为desired并且返回true,否则将expected的值修改为bool变量的值,并且返回false.

其伪代码可以这么理解

1
2
3
4
5
6
7
8
template <typename T>
bool atomic<T>::compare_exchange_strong(T &expected, T desired) {
std::lock_guard<std::mutex> guard(m_lock);
if (m_val == expected)
return m_val = desired, true;
else
return expected = m_val, false;
}

compare_exchange_weak功能比compare_exchange_strong弱一些,他不能保证atomic<T>的值和expected的值相等时也会做交换,很可能原子变量和预期值相等也会返回false,所以使用要多次循环使用。

我们们定义一个类CircularQueSeq, 其内容和之前我们定义的类CircularQueLk差不多,只不过将类的成员变量mutex换成atomic类型的原子变量, 我们可以利用自旋锁的思路将锁替换为原子变量循环检测的方式,进而达到锁住互斥逻辑的效果。

大家可以先看一下全部的代码感受一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
template<typename T, size_t Cap>
class CircularQueSeq :private std::allocator<T> {
public:
CircularQueSeq() :_max_size(Cap + 1), _data(std::allocator<T>::allocate(_max_size)), _atomic_using(false),_head(0), _tail(0) {}
CircularQueSeq(const CircularQueSeq&) = delete;
CircularQueSeq& operator = (const CircularQueSeq&) volatile = delete;
CircularQueSeq& operator = (const CircularQueSeq&) = delete;

~CircularQueSeq() {
//循环销毁
bool use_expected = false;
bool use_desired = true;
do
{
use_expected = false;
use_desired = true;
}
while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));
//调用内部元素的析构函数
while (_head != _tail) {
std::allocator<T>::destroy(_data + _head);
_head = (_head+1)% _max_size;
}
//调用回收操作
std::allocator<T>::deallocate(_data, _max_size);

do
{
use_expected = true;
use_desired = false;
}
while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));
}

//先实现一个可变参数列表版本的插入函数最为基准函数
template <typename ...Args>
bool emplace(Args && ... args) {

bool use_expected = false;
bool use_desired = true;
do
{
use_expected = false;
use_desired = true;
}
while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));

//判断队列是否满了
if ((_tail + 1) % _max_size == _head) {
std::cout << "circular que full ! " << std::endl;
do
{
use_expected = true;
use_desired = false;
}
while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));
return false;
}
//在尾部位置构造一个T类型的对象,构造参数为args...
std::allocator<T>::construct(_data + _tail, std::forward<Args>(args)...);
//更新尾部元素位置
_tail = (_tail + 1) % _max_size;

do
{
use_expected = true;
use_desired = false;
}
while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));

return true;
}

//push 实现两个版本,一个接受左值引用,一个接受右值引用

//接受左值引用版本
bool push(const T& val) {
std::cout << "called push const T& version" << std::endl;
return emplace(val);
}

//接受右值引用版本,当然也可以接受左值引用,T&&为万能引用
// 但是因为我们实现了const T&
bool push(T&& val) {
std::cout << "called push T&& version" << std::endl;
return emplace(std::move(val));
}

//出队函数
bool pop(T& val) {

bool use_expected = false;
bool use_desired = true;
do
{
use_desired = true;
use_expected = false;
} while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));
//判断头部和尾部指针是否重合,如果重合则队列为空
if (_head == _tail) {
std::cout << "circular que empty ! " << std::endl;
do
{
use_expected = true;
use_desired = false;
}
while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));
return false;
}
//取出头部指针指向的数据
val = std::move(_data[_head]);
//更新头部指针
_head = (_head + 1) % _max_size;

do
{
use_expected = true;
use_desired = false;
}while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));
return true;
}
private:
size_t _max_size;
T* _data;
std::atomic<bool> _atomic_using;
size_t _head = 0;
size_t _tail = 0;
};

我们可以看到emplace函数以及pop函数等将锁替换为原子变量。采用do while的方式就是因为compare_exchange_strong比较原子变量和use_expected的值不同的时候会使use_expected改变,所以我们需要在再次循环之前重置use_expected和use_desired的值。

我们可以写一个函数在单线程情况下下测试一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
void TestCircularQueSeq()
{
CircularQueSeq<MyClass, 3> cq_seq;
for(int i = 0; i < 4; i++)
{
MyClass mc1(i);
auto res = cq_seq.push(mc1);
if(!res)
{
break;
}
}

for(int i = 0; i < 4; i++)
{
MyClass mc1;
auto res = cq_seq.pop(mc1);
if(!res)
{
break;
}

std::cout << "pop success, " << mc1 << std::endl;
}

for (int i = 0; i < 4; i++)
{
MyClass mc1(i);
auto res = cq_seq.push(mc1);
if (!res)
{
break;
}
}

for (int i = 0; i < 4; i++)
{
MyClass mc1;
auto res = cq_seq.pop(mc1);
if (!res)
{
break;
}

std::cout << "pop success, " << mc1 << std::endl;
}
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
called push const T& version
called push const T& version
called push const T& version
called push const T& version
circular que full !
pop success, MyClass Data is 0
pop success, MyClass Data is 1
pop success, MyClass Data is 2
circular que empty !
called push const T& version
called push const T& version
called push const T& version
called push const T& version
circular que full !
pop success, MyClass Data is 0
pop success, MyClass Data is 1
pop success, MyClass Data is 2
circular que empty !

多线程情况下也能保证安全是因为原子变量循环检测保证有且只有一个线程修改成功。读取也是这样。

单一原子变量的弊端

我们考虑上述单一原子变量的弊端

多个线程push和pop操作耦合读太高,同一时刻仅有一个线程pop或者push,而且互斥逻辑的精度不够。影响效率。

我们需要考虑将pop和push操作解耦,我们采用的是环形队列,将tail和head作为原子变量可以实现精细控制。

比如我们做push操作的时候,一个线程更新万tail标签和数据后,其他线程就可以pop或者push了,精细控制的好处就是效率提升。

我们定义一个新的类CircularQueLight,类的基本数据结构和CircularQueSeq差不多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
template<typename T, size_t Cap>
class CircularQueLight: private std::allocator<T>
{
public:
CircularQueLight():_max_size(Cap + 1),
_data(std::allocator<T>::allocate(_max_size))
, _head(0), _tail(0) {}

CircularQueLight(const CircularQueLight&) = delete;
CircularQueLight& operator = (const CircularQueLight&) volatile = delete;
CircularQueLight& operator = (const CircularQueLight&) = delete;
private:
size_t _max_size;
T* _data;
std::atomic<size_t> _head;
std::atomic<size_t> _tail;
};

我们将_head 和_tail 替换为原子变量。

接下来我们考虑pop逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bool pop(T& val) {

size_t h;
do
{
h = _head.load(); //1 处
//判断头部和尾部指针是否重合,如果重合则队列为空
if(h == _tail.load())
{
return false;
}
val = _data[h]; // 2处

} while (!_head.compare_exchange_strong(h,
(h+1)% _max_size)); //3 处

return true;
}

在pop逻辑里我们在1处load获取头部head的值,在2处采用了复制的方式将头部元素取出赋值给val,而不是通过std::move,因为多个线程同时pop最后只有一个线程成功执行3处代码退出,而失败的则需要继续循环,从更新后的head处pop元素。所以不能用std::move,否则会破坏原有的队列数据。

接下来我们来做push的函数逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
   bool push(T& val)
{
size_t t;
do
{
t = _tail.load(); //1
//判断队列是否满
if( (t+1)%_max_size == _head.load())
{
return false;
}

_data[t] = val; //2

} while (!_tail.compare_exchange_strong(t,
(t + 1) % _max_size)); //3

return true;
}

push函数的逻辑乍一看和pop一样,但是我们会发现多线程push的情况存在线程安全问题。

比如我们线程1 push(1) 而线程2 push(2). 很有可能的顺序是

1.1 -> 1.2 -> 2.1 -> 2.2 -> 1.3

这样我们看到的效果就是_data[t]被存储为2了,而实际情况应该是被存储为1,因为线程1的原子变量生效,而线程2的原子变量不满足需继续循环。所以_data[t]必须修改为1.

那我们改进一下push的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
 bool push(T& val)
{
size_t t;
do
{
t = _tail.load(); //1
//判断队列是否满
if( (t+1)%_max_size == _head.load())
{
return false;
}



} while (!_tail.compare_exchange_strong(t,
(t + 1) % _max_size)); //3

_data[t] = val; //2

return true;
}

我们将2处的代码移动到循环之外,这样能保证多个线程push,仅有一个线程生效时,他写入的数据一定是本线程要写入到tail的数据,而此时tail被缓存在t里,那是一个线程本地变量,所以在这种情况下我们能确定即使多个线程运行到2处,他们的t值也是不同的,并不会产生线程安全问题。

毕竟多个线程push数据时对资源的竞争仅限tail。

但是这种push操作仍然会有安全问题

我们思考这种情况

https://cdn.llfc.club/1699154222424.jpg

此时head和tail都指向1这个位置,当我们执行push(9)时,按照我们的逻辑会先执行3再执行2.

也就是会先将tail移动,然后更新1的值为9.

那如果我们更新了tail之后,还没来的及更新1为9,那么此时如果有其他的线程读取head的值,会读取到1,而不是9.

从多线程安全角度来讲这是不安全的,我们理想的情况是一个线程写完数据后另一个线程读取的就是之前写入的最新值。

为了解决这个问题,我们可以增加另一个原子变量_tail_update来标记尾部数据是否修改完成,如果尾部数据没有修改完成,此时其他线程pop时获取的数据就是不安全的,所以pop要返回false。

先实现push版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
   bool push(const T& val)
{
size_t t;
do
{
t = _tail.load(); //1
//判断队列是否满
if( (t+1)%_max_size == _head.load())
{
return false;
}



} while (!_tail.compare_exchange_strong(t,
(t + 1) % _max_size)); //3

_data[t] = val; //2
size_t tailup;
do
{
tailup = t;

} while (_tail_update.compare_exchange_strong(tailup,
(tailup + 1) % _max_size));
return true;
}

再实现pop版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bool pop(T& val) {

size_t h;
do
{
h = _head.load(); //1 处
//判断头部和尾部指针是否重合,如果重合则队列为空
if(h == _tail.load())
{
return false;
}

//判断如果此时要读取的数据和tail_update是否一致,如果一致说明尾部数据未更新完
if(h == _tail_update.load())
{
return false;
}
val = _data[h]; // 2处

} while (!_head.compare_exchange_strong(h,
(h+1)% _max_size)); //3 处

return true;
}

pop版本也是,先判断队列是否为空,再判断h是否和_tail_update的值相等,如果相等说明有写数据的没更新完,直接返回false或者循环等待也行,为了方便我们直接返回false即可。

因为我们知道原子操作默认采用的是memory_order_seq_cst内存顺序,性能上不是最优的,我们可以用acquire和release的内存顺序实现同步的效果。

优化性能

我们用acquire和release模型优化上述代码,实现同步。
最简单的方式就是将load的地方变为memory_order_relaxed,compare_exchange_strong的地方变为memory_order_release

我们先看pop操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
bool pop(T& val) {

size_t h;
do
{
h = _head.load(std::memory_order_relaxed); //1 处
//判断头部和尾部指针是否重合,如果重合则队列为空
if (h == _tail.load(std::memory_order_acquire)) //2处
{
std::cout << "circular que empty ! " << std::endl;
return false;
}

//判断如果此时要读取的数据和tail_update是否一致,如果一致说明尾部数据未更新完
if (h == _tail_update.load(std::memory_order_acquire)) //3处
{
return false;
}
val = _data[h]; // 2处

} while (!_head.compare_exchange_strong(h,
(h + 1) % _max_size, std::memory_order_release, std::memory_order_relaxed)); //4 处
std::cout << "pop data success, data is " << val << std::endl;
return true;
}

1 处为memory_order_relaxed是因为即使多个线程pop,每个线程获取的head可能不及时,这个没关系,因为我们有4处的while来重试。

2 compare_exchange_strong操作,在期望的条件匹配时采用memory_order_release, 期望的条件不匹配时memory_order_relaxed可以提升效率,毕竟还是要重试的。

我们再看push 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
bool push(const T& val)
{
size_t t;
do
{
t = _tail.load(std::memory_order_relaxed); //5
//判断队列是否满
if ((t + 1) % _max_size == _head.load(std::memory_order_acquire))
{
std::cout << "circular que full ! " << std::endl;
return false;
}



} while (!_tail.compare_exchange_strong(t,
(t + 1) % _max_size, std::memory_order_release, std::memory_order_relaxed)); //6

_data[t] = val;
size_t tailup;
do
{
tailup = t;

} while (_tail_update.compare_exchange_strong(tailup,
(tailup + 1) % _max_size, std::memory_order_release, std::memory_order_relaxed)); //7

std::cout << "called push data success " << val << std::endl;
return true;
}

两个线程协同工作,一个线程先push,另一个线程后pop,那么对于tail部分和_tail_update,我们要保证push的结果_data[t] = val;先于pop的结果val = _data[h];

所以push线程中对于_tail_update的compare_exchange_strong操作采用memory_order_release方式。

pop线程对于_tail_update的load操作采用memory_order_acquire。

如果一个线程先pop,另一个线程先push,那么对于head部分,我们要保证pop的结果val = _data[h];先于pop的结果_data[t] = val;。

思考

优势

无锁高并发. 虽然存在循环重试, 但是这只会在相同操作并发的时候出现. push 不会因为与 pop 并发而重试, 反之亦然.

缺陷

这样队列只应该存储标量, 如果存储类对象时,多个push线程只有一个线程push成功,而拷贝复制的开销很大,其他线程会循环重试,每次重试都会有开销。

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

源码链接

https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day12-circularque

用内存顺序实现内存模型

Posted on 2023-10-25 | In C++

前情回顾

前文我们介绍了六种内存顺序,以及三种内存模型,本文通过代码示例讲解六种内存顺序使用方法,并实现相应的内存模型。

memory_order_seq_cst

memory_order_seq_cst代表全局一致性顺序,可以用于 store, load 和 read-modify-write 操作, 实现 sequencial consistent 的顺序模型. 在这个模型下, 所有线程看到的所有操作都有一个一致的顺序, 即使这些操作可能针对不同的变量, 运行在不同的线程.

我们看一下之前写的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
std::atomic<bool> x, y;
std::atomic<int> z;

void write_x_then_y() {
x.store(true, std::memory_order_relaxed); // 1
y.store(true, std::memory_order_relaxed); // 2
}

void read_y_then_x() {
while (!y.load(std::memory_order_relaxed)) { // 3
std::cout << "y load false" << std::endl;
}

if (x.load(std::memory_order_relaxed)) { //4
++z;
}

}

void TestOrderRelaxed() {

std::thread t1(write_x_then_y);
std::thread t2(read_y_then_x);
t1.join();
t2.join();
assert(z.load() != 0); // 5
}

上面的代码load和store都采用的是memory_order_relaxed。线程t1按次序执行1和2,但是线程t2看到的可能是y为true,x为false。进而导致TestOrderRelaxed触发断言z为0.
如果换成memory_order_seq_cst则能保证所有线程看到的执行顺序是一致的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

void write_x_then_y() {
x.store(true, std::memory_order_seq_cst); // 1
y.store(true, std::memory_order_seq_cst); // 2
}

void read_y_then_x() {
while (!y.load(std::memory_order_seq_cst)) { // 3
std::cout << "y load false" << std::endl;
}

if (x.load(std::memory_order_seq_cst)) { //4
++z;
}

}

void TestOrderSeqCst() {

std::thread t1(write_x_then_y);
std::thread t2(read_y_then_x);
t1.join();
t2.join();
assert(z.load() != 0); // 5
}

上面的代码x和y采用的是memory_order_seq_cst, 所以当线程t2执行到3处并退出循环时我们可以断定y为true,因为是全局一致性顺序,所以线程t1已经执行完2处将y设置为true,那么线程t1也一定执行完1处代码并对t2可见,所以当t2执行至4处时x为true,那么会执行z++保证z不为零,从而不会触发断言。

实现 sequencial consistent 模型有一定的开销. 现代 CPU 通常有多核, 每个核心还有自己的缓存. 为了做到全局顺序一致, 每次写入操作都必须同步给其他核心. 为了减少性能开销, 如果不需要全局顺序一致, 我们应该考虑使用更加宽松的顺序模型.

memory_order_relaxed

memory_order_relaxed 可以用于 store, load 和 read-modify-write 操作, 实现 relaxed 的顺序模型.
前文我们介绍过这种模型下, 只能保证操作的原子性和修改顺序 (modification order) 一致性, 无法实现 synchronizes-with 的关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void TestOrderRelaxed() {
std::atomic<bool> rx, ry;

std::thread t1([&]() {
rx.store(true, std::memory_order_relaxed); // 1
ry.store(true, std::memory_order_relaxed); // 2
});


std::thread t2([&]() {
while (!ry.load(std::memory_order_relaxed)); //3
assert(rx.load(std::memory_order_relaxed)); //4
});

t1.join();
t2.join();
}

上面的代码在一定程度上会触发断言。因为线程t1执行完1,2之后,有可能2操作的结果先放入内存中被t2看到,此时t2执行退出3循环进而执行4,此时t2看到的rx值为false触发断言。

我们称2和3不构成同步关系, 2 “ not synchronizes with “ 3

如果能保证2的结果立即被3看到, 那么称 2 “synchronizes with “ 3。

如果2 同步于 3还有一层意思就是 如果在线程t1 中 1 先于 2(sequence before), 那么 1先行于3。那我们可以理解t2执行到3处时,可以获取到t1执行1操作的结果,也就是rx为true.

t2线程中3先于4(sequence before),那么1 操作先行于 4. 也就是1 操作的结果可以立即被4获取。进而不会触发断言。

怎样保证2 同步于 3 是解决问题的关键, 我们引入 Acquire-Release 内存顺序。

Acquire-Release

在 acquire-release 模型中, 会使用 memory_order_acquire, memory_order_release 和 memory_order_acq_rel 这三种内存顺序. 它们的用法具体是这样的:

对原子变量的 load 可以使用 memory_order_acquire 内存顺序. 这称为 acquire 操作.

对原子变量的 store 可以使用 memory_order_release 内存顺序. 这称为 release 操作.

read-modify-write 操作即读 (load) 又写 (store), 它可以使用 memory_order_acquire, memory_order_release 和 memory_order_acq_rel:

  1. 如果使用 memory_order_acquire, 则作为 acquire 操作;
  2. 如果使用 memory_order_release, 则作为 release 操作;
  3. 如果使用 memory_order_acq_rel, 则同时为两者.

Acquire-release 可以实现 synchronizes-with 的关系. 如果一个 acquire 操作在同一个原子变量上读取到了一个 release 操作写入的值, 则这个 release 操作 “synchronizes-with” 这个 acquire 操作.

我们可以通过Acquire-release 修正 TestOrderRelaxed函数以达到同步的效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void TestReleaseAcquire() {
std::atomic<bool> rx, ry;

std::thread t1([&]() {
rx.store(true, std::memory_order_relaxed); // 1
ry.store(true, std::memory_order_release); // 2
});


std::thread t2([&]() {
while (!ry.load(std::memory_order_acquire)); //3
assert(rx.load(std::memory_order_relaxed)); //4
});

t1.join();
t2.join();
}

上面的例子中我们看到ry.store使用的是std::memory_order_release, ry.load使用的是std::memory_order_relaxed.

t1执行到2将ry 设置为true, 因为使用了Acquire-release 顺序, 所以 t2 执行到3时读取ry为true, 因此2和3 可以构成同步关系。

又因为单线程t1内 1 sequence before 2,所以1 happens-before 3.
因为单线程t2内 3 sequence before 4. 所以 1 happens-before 4.

可以断定4 不会触发断言。

我们从cpu结构图理解这一情景

https://cdn.llfc.club/1697539893049.jpg

到此大家一定要记住仅 Acquire-release能配合达到 synchronizes-with效果,再就是memory_order_seq_cst可以保证全局顺序唯一,其他情况的内存顺序都能保证顺序,使用时需注意。

Acquire-release 的开销比 sequencial consistent 小. 在 x86 架构下, memory_order_acquire 和 memory_order_release 的操作不会产生任何其他的指令, 只会影响编译器的优化: 任何指令都不能重排到 acquire 操作的前面, 且不能重排到 release 操作的后面; 否则会违反 acquire-release 的语义. 因此很多需要实现 synchronizes-with 关系的场景都会使用 acquire-release.

Release sequences

我们再考虑一种情况,多个线程对同一个变量release操作,另一个线程对这个变量acquire,那么只有一个线程的release操作喝这个acquire线程构成同步关系。

看下面的代码 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void ReleasAcquireDanger2() {
std::atomic<int> xd{0}, yd{ 0 };
std::atomic<int> zd;

std::thread t1([&]() {
xd.store(1, std::memory_order_release); // (1)
yd.store(1, std::memory_order_release); // (2)
});

std::thread t2([&]() {
yd.store(2, std::memory_order_release); // (3)
});


std::thread t3([&]() {
while (!yd.load(std::memory_order_acquire)); //(4)
assert(xd.load(std::memory_order_acquire) == 1); // (5)
});

t1.join();
t2.join();
t3.join();
}

我们可以看到t3在yd为true的时候才会退出,那么导致yd为true的有两种情况,一种是1,另一种是2, 所以5处可能触发断言。

并不是只有在 acquire 操作读取到 release 操作写入的值时才能构成 synchronizes-with 关系. 为了说这种情况, 我们需要引入 release sequence 这个概念.

针对一个原子变量 M 的 release 操作 A 完成后, 接下来 M 上可能还会有一连串的其他操作. 如果这一连串操作是由

  1. 同一线程上的写操作
  2. 任意线程上的 read-modify-write 操作
    这两种构成的, 则称这一连串的操作为以 release 操作 A 为首的 release sequence. 这里的写操作和 read-modify-write 操作可以使用任意内存顺序.

如果一个 acquire 操作在同一个原子变量上读到了一个 release 操作写入的值, 或者读到了以这个 release 操作为首的 release sequence 写入的值, 那么这个 release 操作 “synchronizes-with” 这个 acquire 操作.

看下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void ReleaseSequence() {
std::vector<int> data;
std::atomic<int> flag{ 0 };

std::thread t1([&]() {
data.push_back(42); //(1)
flag.store(1, std::memory_order_release); //(2)
});

std::thread t2([&]() {
int expected = 1;
while (!flag.compare_exchange_strong(expected, 2, std::memory_order_relaxed)) // (3)
expected = 1;
});

std::thread t3([&]() {
while (flag.load(std::memory_order_acquire) < 2); // (4)
assert(data.at(0) == 42); // (5)
});

t1.join();
t2.join();
t3.join();
}

我们考虑t3要想退出首先flag要等于2,那么就要等到t2将flag设置为2,而flag设置为2又要等到t1将flag设置为1. 所以我们捋一下顺序 2->3->4

t1中操作2是release操作,以2为开始,其他线程(t2)的读改写在release操作之后,我们称之为release sequence, t3要读取release sequence写入的值,所以我们称t1的release操作 “synchronizes with “ t3的 acquire 操作。

memory_order_consume

memory_order_consume 其实是 acquire-release 模型的一部分, 但是它比较特殊, 它涉及到数据间相互依赖的关系. 就是前文我们提及的 carries dependency和 dependency-ordered before.

我们复习一下

如果操作 a “sequenced-before” b, 且 b 依赖 a 的数据, 则 a “carries a dependency into” b. 一般来说, 如果 a 的值用作 b 的一个操作数, 或者 b 读取到了 a 写入的值, 都可以称为 b 依赖于 a

1
2
3
p++;   // (1)
i++; // (2)
p[i] // (3)

(1) “sequenced-before” (2), (2) “sequenced-before” (3), 而(1)和(2)的值作为(3)的下表运算符[]的操作数。

我们可以称(1) “carries a dependency into “ (3), (2) “carries a dependency into “ (3), 但是(1)和(2)不是依赖关系。

memory_order_consume 可以用于 load 操作. 使用 memory_order_consume 的 load 称为 consume 操作. 如果一个 consume 操作在同一个原子变量上读到了一个 release 操作写入的值, 或以其为首的 release sequence 写入的值, 则这个 release 操作 “dependency-ordered before” 这个 consume 操作.

看下面这个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void ConsumeDependency() {
std::atomic<std::string*> ptr;
int data;

std::thread t1([&]() {
std::string* p = new std::string("Hello World"); // (1)
data = 42; // (2)
ptr.store(p, std::memory_order_release); // (3)
});

std::thread t2([&]() {
std::string* p2;
while (!(p2 = ptr.load(std::memory_order_consume))); // (4)
assert(*p2 == "Hello World"); // (5)
assert(data == 42); // (6)
});

t1.join();
t2.join();
}

t2执行到(4)处时,需要等到ptr非空才能退出循环,这就依赖t1执行完(3)操作。

因此(3) “dependency-ordered before” (4), 根据前文我们介绍了dependency等同于synchronizes ,所以(3) “inter-thread happens-before”. (4)

因为(2) “sequenced before” (3), 所以(2) “happens-before “ (4)

因为(4) “sequenced before” (5), 所以(2) “happens-before “ (5)

因为(5) “sequenced before” (6), 所以(2) “happens-before “ (6)

所以(6)处断言不会触发,同样的道理(5)处断言也不会触发。

单例模式改良

还记得我们之前用智能指针双重检测方式实现的单例模式吗?我当时说过是存在线程安全问题的,看看下面这段单例模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//利用智能指针解决释放问题
class SingleAuto
{
private:
SingleAuto()
{
}
SingleAuto(const SingleAuto&) = delete;
SingleAuto& operator=(const SingleAuto&) = delete;
public:
~SingleAuto()
{
std::cout << "single auto delete success " << std::endl;
}
static std::shared_ptr<SingleAuto> GetInst()
{
// 1 处
if (single != nullptr)
{
return single;
}
// 2 处
s_mutex.lock();
// 3 处
if (single != nullptr)
{
s_mutex.unlock();
return single;
}
// 4处
single = std::shared_ptr<SingleAuto>(new SingleAuto);
s_mutex.unlock();
return single;
}
private:
static std::shared_ptr<SingleAuto> single;
static std::mutex s_mutex;
};

我们写一段代码测试一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
std::shared_ptr<SingleAuto> SingleAuto::single = nullptr;
std::mutex SingleAuto::s_mutex;

void TestSingle() {
std::thread t1([]() {
std::cout << "thread t1 singleton address is 0X: " << SingleAuto::GetInst() << std::endl;
});

std::thread t2([]() {
std::cout << "thread t2 singleton address is 0X: " << SingleAuto::GetInst() << std::endl;
});

t2.join();
t1.join();
}

虽然可以正常输出两次的地址都是同一个,但是我们的单例会存在安全隐患。
1处和4处代码存在线程安全问题,因为4处代码在之前的文章中我谈过,new一个对象再赋值给变量时会存在多个指令顺序

第一种情况

1
2
3
1 为对象allocate一块内存空间
2 调用construct构造对象
3 将构造到的对象地址返回

第二种情况

1
2
3
1 为对象allocate一块内存空间
2 先将开辟的空间地址返回
3 调用construct构造对象

如果是第二种情况,在4处还未构造对象就将地址返回赋值给single,而此时有线程运行至1处判断single不为空直接返回单例实例,如果该线程调用这个单例的成员函数就会崩溃。

为了解决这个问题,我们可以通过内存模型来解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//利用智能指针解决释放问题
class SingleMemoryModel
{
private:
SingleMemoryModel()
{
}
SingleMemoryModel(const SingleMemoryModel&) = delete;
SingleMemoryModel& operator=(const SingleMemoryModel&) = delete;
public:
~SingleMemoryModel()
{
std::cout << "single auto delete success " << std::endl;
}
static std::shared_ptr<SingleMemoryModel> GetInst()
{
// 1 处
if (_b_init.load(std::memory_order_acquire))
{
return single;
}
// 2 处
s_mutex.lock();
// 3 处
if (_b_init.load(std::memory_order_relaxed))
{
s_mutex.unlock();
return single;
}
// 4处
single = std::shared_ptr<SingleMemoryModel>(new SingleMemoryModel);
_b_init.store(true, std::memory_order_release);
s_mutex.unlock();
return single;
}
private:
static std::shared_ptr<SingleMemoryModel> single;
static std::mutex s_mutex;
static std::atomic<bool> _b_init ;
};

std::shared_ptr<SingleMemoryModel> SingleMemoryModel::single = nullptr;
std::mutex SingleMemoryModel::s_mutex;
std::atomic<bool> SingleMemoryModel::_b_init = false;

然后我们测试

1
2
3
4
5
6
7
8
9
10
11
12
void TestSingleMemory() {
std::thread t1([]() {
std::cout << "thread t1 singleton address is 0x: " << SingleMemoryModel::GetInst() << std::endl;
});

std::thread t2([]() {
std::cout << "thread t2 singleton address is 0x: " << SingleMemoryModel::GetInst() << std::endl;
});

t2.join();
t1.join();
}

也可以看到输出的地址一致,但是我们这个改进的版本防止了线程安全问题。

总结

本文介绍了如何通过内存顺序实现内存模型,以及优化了单例模式。

源码链接

https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day11-AcquireRelease

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

C++ 原子操作和内存模型

Posted on 2023-10-16 | In C++

简介

本文介绍C++ 内存模型相关知识,包含几种常见的内存访问策略。

改动序列

在一个C++程序中,每个对象都具有一个改动序列,它由所有线程在对象上的全部写操作构成,其中第一个写操作即为对象的初始化。
大部分情况下,这个序列会随程序的多次运行而发生变化,但是在程序的任意一次运行过程中,所含的全部线程都必须形成相同的改动序列。

改动序列基本要求如下

1 只要某线程看到过某个对象,则该线程的后续读操作必须获得相对新近的值,并且,该线程就同一对象的后续写操作,必然出现在改动序列后方。
2 如果某线程先向一个对象写数据,过后再读取它,那么必须读取前面写的值。
3 若在改动序列中,上述读写操作之间还有别的写操作,则必须读取最后写的值。
4 在程序内部,对于同一个对象,全部线程都必须就其形成相同的改动序列,并且在所有对象上都要求如此.
5 多个对象上的改动序列只是相对关系,线程之间不必达成一致

原子类型

标准原子类型的定义位于头文件<atomic>内。我们可以通过atomic<>定义一些原子类型的变量,如atomic<bool>,atomic<int> 这些类型的操作全是原子化的。

从C++17开始,所有的原子类型都包含一个静态常量表达式成员变量,std::atomic::is_always_lock_free。这个成员变量的值表示在任意给定的目标硬件上,原子类型X是否始终以无锁结构形式实现。如果在所有支持该程序运行的硬件上,原子类型X都以无锁结构形式实现,那么这个成员变量的值就为true;否则为false。

只有一个原子类型不提供is_lock_free()成员函数:std::atomic_flag 。类型std::atomic_flag的对象在初始化时清零,随后即可通过成员函数test_and_set()查值并设置成立,或者由clear()清零。整个过程只有这两个操作。其他的atomic<>的原子类型都可以基于其实现。

std::atomic_flag的test_and_set成员函数是一个原子操作,他会先检查std::atomic_flag当前的状态是否被设置过,

1 如果没被设置过(比如初始状态或者清除后),将std::atomic_flag当前的状态设置为true,并返回false。

2 如果被设置过则直接返回ture。

对于std::atomic<T>类型的原子变量,还支持load()和store()、exchange()、compare_exchange_weak()和compare_exchange_strong()等操作。

内存次序

对于原子类型上的每一种操作,我们都可以提供额外的参数,从枚举类std::memory_order取值,用于设定所需的内存次序语义(memory-ordering semantics)。

枚举类std::memory_order具有6个可能的值,

包括std::memory_order_relaxed、std:: memory_order_acquire、std::memory_order_consume、

std::memory_order_acq_rel、std::memory_order_release和 std::memory_order_seq_cst。

存储(store)操作,可选用的内存次序有std::memory_order_relaxed、std::memory_order_release或std::memory_order_seq_cst。

载入(load)操作,可选用的内存次序有std::memory_order_relaxed、std::memory_order_consume、std::memory_order_acquire或std::memory_order_seq_cst。

“读-改-写”(read-modify-write)操作,可选用的内存次序有std::memory_order_relaxed、std::memory_order_consume、std::memory_order_acquire、std::memory_order_release、std::memory_order_acq_rel或std::memory_order_seq_cst。

原子操作默认使用的是std::memory_order_seq_cst次序。

这六种内存顺序相互组合可以实现三种顺序模型 (ordering model)

Sequencial consistent ordering. 实现同步, 且保证全局顺序一致 (single total order) 的模型. 是一致性最强的模型, 也是默认的顺序模型.
Acquire-release ordering. 实现同步, 但不保证保证全局顺序一致的模型.
Relaxed ordering. 不能实现同步, 只保证原子性的模型.

实现自旋锁

自旋锁是一种在多线程环境下保护共享资源的同步机制。它的基本思想是,当一个线程尝试获取锁时,如果锁已经被其他线程持有,那么该线程就会不断地循环检查锁的状态,直到成功获取到锁为止。

那我们用这个std:atomic_flag实现一个自旋锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <iostream>
#include <atomic>
#include <thread>

class SpinLock {
public:
void lock() {
//1 处
while (flag.test_and_set(std::memory_order_acquire)); // 自旋等待,直到成功获取到锁
}

void unlock() {
//2 处
flag.clear(std::memory_order_release); // 释放锁
}

private:
std::atomic_flag flag = ATOMIC_FLAG_INIT;
};

我们实现一个测试函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void TestSpinLock() {
SpinLock spinlock;
std::thread t1([&spinlock]() {
spinlock.lock();
for (int i = 0; i < 3; i++) {
std::cout << "*";
}
std::cout << std::endl;
spinlock.unlock();
});


std::thread t2([&spinlock]() {
spinlock.lock();
for (int i = 0; i < 3; i++) {
std::cout << "?";
}
std::cout << std::endl;
spinlock.unlock();
});


t1.join();
t2.join();
}

在主函数执行上述代码会看到如下输出

1
2
***
???

1 处 在多线程调用时,仅有一个线程在同一时刻进入test_and_set,因为atomic_flag初始状态为false,所以test_and_set将atomic_flag设置为true,并且返回false。

比如线程A调用了test_and_set返回false,这样lock函数返回,线程A继续执行加锁区域的逻辑。此时线程B调用test_and_set,test_and_set会返回true,导致线程B在while循环中循环等待,达到自旋检测标记的效果。当线程A直行至2处调用clear操作后,atomic_flag被设置为清空状态,线程B调用test_and_set会将状态设为成立并返回false,B线程执行加锁区域的逻辑。

我们看到在设置时使用memory_order_acquire内存次序,在清除时使用了memory_order_release内存次序。

宽松内存序

为了给大家介绍不同的字节序,我们先从最简单的字节序std::memory_order_relaxed(宽松字节序)介绍。
因为字节序是为了实现改动序列的,所以为了理解字节序还要结合改动序列讲起。

我们先看一个CPU和内存结构图

https://cdn.llfc.club/1697539893049.jpg

其中StoreBuffer就是一级Cache, Catche是二级Cache,Memory是三级Cache。

每个标识CPU的块就是core,上图画的就是4核结构。每两个core构成一个bank,共享一个cache。四个core共享memory。

每个CPU所作的store均会写到store buffer中,每个CPU会在任何时刻将store buffer中结果写入到cache或者memory中。

那该如何保证数据一致性?这就要提及MESI一致性协议。

MESI 协议,是一种叫作写失效(Write Invalidate)的协议。在写失效协议里,只有一个 CPU 核心负责写入数据,其他的核心,只是同步读取到这个写入。在这个 CPU 核心写入 cache 之后,它会去广播一个“失效”请求告诉所有其他的 CPU 核心。

MESI 协议对应的四个不同的标记,分别是:

M:代表已修改(Modified)

E:代表独占(Exclusive)

S:代表共享(Shared)

I:代表已失效(Invalidated)

“已修改”用来告诉其他cpu已经修改完成,其他cpu可以向cache中写入数据。

“独占”表示数据只是加载到当前 CPU核 的store buffer中,其他的 CPU 核,并没有加载对应的数据到自己的 store buffer 里。

这个时候,如果要向独占的 store buffer 写入数据,我们可以自由地写入数据,而不需要告知其他 CPU 核。

那么对应的,共享状态就是在多核中同时加载了同一份数据。所以在共享状态下想要修改数据要先向所有的其他 CPU 核心广播一个请求,要求先把其他 CPU 核心里面的 cache ,都变成无效的状态,然后再更新当前 cache 里面的数据。

我们可以这么理解,如果变量a此刻在各个cpu的StoreBuffer中,那么CPU1核修改这个a的值,放入cache时通知其他CPU核写失效,因为同一时刻仅有一个CPU核可以写数据,但是其他CPU核是可以读数据的,那么其他核读到的数据可能是CPU1核修改之前的。这就涉及我们提到的改动序列了。

这里给大家简单介绍两个改动序列的术语

1 “synchronizes-with“ : 同步, “A synchronizes-with B” 的意思就是 A和B同步,简单来说如果多线程环境下,有一个线程先修改了变量m,我们将这个操作叫做A,之后有另一个线程读取变量m,我们将这个操作叫做B,那么B一定读取A修改m之后的最新值。也可以称作 A “happens-before“ B,即A操作的结果对B操作可见。

2 “happens-before“ : 先行,”A happens-before B” 的意思是如果A操作先于B操作,那么A操作的结果对B操作可见。”happens-before“包含很多种境况,不仅仅是我们上面提到的”synchronizes-with“,之后给大家一个脑图详细说明”happens-before“的几种情况。

我们接下来谈谈std::memory_order_relaxed。

关于std::memory_order_relaxed具备如下几个功能:

1 作用于原子变量
2 不具有synchronizes-with关系
3 对于同一个原子变量,在同一个线程中具有happens-before关系, 在同一线程中不同的原子变量不具有happens-before关系,可以乱序执行。
4 多线程情况下不具有happens-before关系。

由上述可知,如果采用最松散的内存顺序模型,在一个线程中,如果某个表达式已经看到原子变量的某个值a,则该表达式的后续表达式只能看到a或者比a更新的值。

我们看下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
std::atomic<bool> x, y;
std::atomic<int> z;

void write_x_then_y() {
x.store(true, std::memory_order_relaxed); // 1
y.store(true, std::memory_order_relaxed); // 2
}

void read_y_then_x() {
while (!y.load(std::memory_order_relaxed)) { // 3
std::cout << "y load false" << std::endl;
}

if (x.load(std::memory_order_relaxed)) { //4
++z;
}

}

上面的代码封装了两个函数,write_x_then_y负责将x和y存储为true。read_y_then_x负责读取x和y的值。

接下来我们写如下函数调用上面的两个函数

1
2
3
4
5
6
7
void TestOrderRelaxed() {
std::thread t1(write_x_then_y);
std::thread t2(read_y_then_x);
t1.join();
t2.join();
assert(z.load() != 0); // 5
}

上面的代码assert断言z不为0,但有时运行到5处z会等于0触发断言。

我们从两个角度分析

1 从cpu架构分析

假设线程t1运行在CPU1上,t2运行在CPU3上,那么t1对x和y的操作,t2是看不到的。

比如当线程t1运行至1处将x设置为true,t1运行至2处将y设置为true。这些操作仅在CPU1的store buffer中,还未放入cache和memory中,CPU2自然不知道。

如果CPU1先将y放入memory,那么CPU2就会读取y的值为true。那么t2就会运行至3处从while循环退出,进而运行至4处,此时CPU1还未将x的值写入memory,

t2读取的x值为false,进而线程t2运行结束,然后CPU1将x写入true, t1结束运行,最后主线程运行至5处,因为z为0,所以触发断言。

2 从宽松内存序分析

因为memory_order_relaxed是宽松的内存序列,它只保证操作的原子性,并不能保证多个变量之间的顺序性,也不能保证同一个变量在不同线程之间的可见顺序。

比如t1可能先运行2处代码再运行1处代码,因为我们的代码会被编排成指令执行,编译器在不破坏语义的情况下(2处和1处代码无耦合,可调整顺序),2可能先于1执行。如果这样,t2运行至3处退出while循环,继续运行4处,此时t1还未执行1初代码,则t2运行4处条件不成立不会对z做增加,t2结束。这样也会导致z为0引发断言。

画个图说明上述情况

https://cdn.llfc.club/1697596279730.jpg

我们在看一个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void TestOderRelaxed2() {
std::atomic<int> a{ 0 };
std::vector<int> v3, v4;
std::thread t1([&a]() {
for (int i = 0; i < 10; i += 2) {
a.store(i, std::memory_order_relaxed);
}
});

std::thread t2([&a]() {
for (int i = 1; i < 10; i += 2)
a.store(i, std::memory_order_relaxed);
});


std::thread t3([&v3, &a]() {
for (int i = 0; i < 10; ++i)
v3.push_back(a.load(std::memory_order_relaxed));
});

std::thread t4([&v4, &a]() {
for (int i = 0; i < 10; ++i)
v4.push_back(a.load(std::memory_order_relaxed));
});

t1.join();
t2.join();
t3.join();
t4.join();

for (int i : v3) {
std::cout << i << " ";
}

std::cout << std::endl;
for (int i : v4) {
std::cout << i << " ";
}
std::cout << std::endl;
}

线程t1向a中存储偶数,线程t2向a中存储奇数。线程t3从a读取数据写入v3中,线程t4从线程a中读取数据写入v4中。这四个线程并发执行,最后打印v3和v4的数据。
如果机器性能足够好我们看到的可能是这种输出

1
2
9 9 9 9 9 9 9 9 9 9
9 9 9 9 9 9 9 9 9 9

也可能是这种

1
2
0 1 7 6 8 9 9 9 9 9 
0 2 1 4 5 7 6 8 9 9

但我们能确定的是如果v3中7先于6,8,9等,那么v4中也是7先于6,8,9。

因为多个线程仅操作了a变量,通过memory_order_relaxed的方式仅能保证对a的操作是原子的(同一时刻仅有一个线程写a的值,但是可能多个线程读取a的值)。

但是多个线程之间操作不具备同步关系,也就是线程t1将a改为7,那么线程t3不知道a改动的最新值为7,它读到a的值为1。只是要过一阵子可能会读到7或者a变为7之后又改动的其他值。

但是t3,t4两个线程读取a的次序是一致的,比如t3和t4都读取了7和9,t3读到7在9之前,那么t4也读取到7在9之前。

因为我们memory_order_relaxed保证了多线程对同一个变量的原子操作的安全性,只是可见性会有延迟罢了。

先行(Happens-before)

Happens-before 是一个非常重要的概念. 如前文我们提及:
如果操作 a “happens-before” 操作 b, 则操作 a 的结果对于操作 b 可见. happens-before 的关系可以建立在用一个线程的两个操作之间, 也可以建立在不同的线程的两个操作之间。

顺序先行(sequenced-before)

单线程情况下前面的语句先执行,后面的语句后执行。操作a先于操作b,那么操作b可以看到操作a的结果。我们称操作a顺序先行于操作b。也就是”a sequenced-before b”。

这种情况下”a happens before b”

比如下面

1
2
3
4
5
6
int main(){
//操作a
int m = 100;
//操作b
std::cout << "m is " << std::endl;
}

上面操作b 能读取m的值为100.

“sequencde-before”具备传递性,比如操作 a “sequenced-before” 操作 b, 且操作 b “sequenced-before” 操作 m, 则操作 a “sequenced-before” 操作 m.

https://cdn.llfc.club/1697601383740.jpg

线程间先行

线程间先行又叫做”inter-thread-happens-before”,这是多线程情况的”happens-before”.

我们前面提到的”synchronizes-with” 可以构成 “happens-before”。

如果线程 1 中的操作 a “synchronizes-with” 线程 2 中的操作 b, 则操作 a “inter-thread happens-before” 操作 b.

https://cdn.llfc.club/1697609727864.jpg

此外 synchronizes-with 还可以 “后接” 一个 sequenced-before 关系组合成 inter-thread happens-before 的关系:

比如操作 a “synchronizes-with” 操作 b, 且操作 b “sequenced-before” 操作 m, 则操作 a “inter-thread happens-before” 操作 m.

https://cdn.llfc.club/1697609586958.jpg

那同样的道理, Inter-thread happens-before 关系则可以 “前接” 一个 sequenced-before 关系以延伸它的范围; 而且 inter-thread happens-before 关系具有传递性:

1 如果操作 a “sequenced-before” 操作 k, 且操作 k “inter-thread happens-before” 操作 b, 则操作 a “inter-thread happens-before” 操作 b.

https://cdn.llfc.club/1697610467141.jpg

2 如果操作 a “inter-thread happens-before” 操作 k, 且操作 k “inter-thread happens-before” 操作 b, 则操作 a “inter-thread happens-before” 操作 b.

https://cdn.llfc.club/1697610652291.jpg

依赖关系

依赖关系有 carries dependency 和 dependency-ordered before.

单线程情况下a “sequenced-before” b, 且 b 依赖 a 的数据, 则 a “carries a dependency into” b. 称作 a 将依赖关系带给 b, 也理解为b依赖于a。

看下面的代码

1
2
3
4
5
6
7
8
void TestDependency() {
// 1 处
std::string str = "hello world!";
// 2 处
int i = 3;
// 3 处
std::cout << str[i] << std::endl;
}

函数TestDependency内部打印str[i]的值。3处代码需要依赖1处和2处两个变量的值,所以达成依赖关系。

我们看单线程情况下按顺序执行1,2,3处代码,1 “sequenced-before” 3,且3 依赖 1的数据,则 1 “carries a dependency into” 3

同样的道理 2 “sequenced-before” 3, 且3依赖2 的数据,则2 “carries a dependency into” 3.

“carries a dependency into” 也被归为”happens-before”。

https://cdn.llfc.club/1697614479597.jpg

2 多线程情况下

线程1执行操作A(比如对i自增),线程2执行操作B(比如根据i访问字符串下表的元素), 如果线程1先于线程2执行,且操作A的结果对操作B可见,我们将这种叫做
A “dependency-ordered before” B. 有人会说这不是前面说到的A “synchronizes with “ B吗?你可以这么理解。就当作他们达到的效果是一致的,只不过A “dependency-ordered before” B 更细化一点,表述了一种依赖,比如操作A仅仅对i增加,而没有对字符串修改。而操作B需要通过i访问字符串数据。那操作B实际上是依赖于A的。

Happens-before不代表指令执行顺序

Happens-before不代表指令实际执行顺序,C++编译器可以对不相关的指令任意编排达到优化效果,Happens-before仅是C++语义层面的描述,表示 a “Happens-before” b仅能说明a操作的结果对b操作可见。

看这样一段代码

1
2
3
4
5
6
7
8
9
int  Add() {
int a = 0, b = 0;
//1 处
a++;
// 2 处
b++;
// 3 处
return a + b;
}

单线程执行上述代码,操作1一定是happens-before 操作2 的(a “sequenced-before” b),就是我们理解的 a++ 先于 b++。

但是计算机的指令可能不是这样,一条C++语句对于多条计算机指令。

有可能是先将b值放入寄存器eax做加1,再将a的值放入寄存器edx做加1,然后再将eax寄存器的值写回a,将edx写回b。

因为对于计算机来说1处操作和2处操作的顺序对于3处来说并无影响。只要3处返回a+b之前能保证a和b的值是增加过的即可。

那我们语义上的”Happens-before”有意义吗? 是有意义的,因为如果 a “sequenced-before” b, 那么无论指令如何编排,最终写入内存的顺序一定是a先于b。

只不过C++编译器不断优化尽可能不造成指令编排和语义理解的差异,上面C++的代码转换为汇编指令如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
	int a = 0, b = 0;
00A1C8F5 mov dword ptr [a],0
00A1C8FC mov dword ptr [b],0
//1 处
a++;
00A1C903 mov eax,dword ptr [a]
00A1C906 add eax,1
00A1C909 mov dword ptr [a],eax
// 2 处
b++;
00A1C90C mov eax,dword ptr [b]
00A1C90F add eax,1
00A1C912 mov dword ptr [b],eax
return a + b;
00A1C915 mov eax,dword ptr [a]
00A1C918 add eax,dword ptr [b]

可以看到C++编译器尽可能不造成语义理解和指令编排上的歧义。

脑图

我们将”happens-before” 的几种情况做成脑图,方便理解

https://cdn.llfc.club/Happens-before%E8%84%91%E5%9B%BE%20%281%29.png

我们画一个框将”happens-before” 的几种情况框起来

https://cdn.llfc.club/1697694671481.jpg

总结

本文介绍了3种内存模型,包括全局一致性模型,同步模型以及最宽松的原子模型,以及6种内存序,下一篇将介绍如何利用6中内存序达到三种模型的效果。

详细源码

https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day10-MemoryModel

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

thread,async源码解析

Posted on 2023-10-11 | In C++

简介

本文件汇总粉丝提出的关于并发的几个问题,做个备份,方便大家理解和学习。

局部变量返回值

关于局部变量返回值的问题我曾在视频中说会通过构造函数返回一个局部变量给调用者,编译器会先执行拷贝构造,如果没有拷贝构造再寻找移动构造。这么说是有问题的。
有热心的粉丝查询了chatgpt,当函数返回一个类类型的局部变量时会先调用移动构造,如果没有移动构造再调用拷贝构造。
所以对于一些没有拷贝构造但是实现了移动构造的类类型也支持通过函数返回局部变量。
在 C++11 之后,编译器会默认使用移动语义(move semantics)来提高性能

看个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class TestCopy {
public:
TestCopy(){}
TestCopy(const TestCopy& tp) {
std::cout << "Test Copy Copy " << std::endl;
}
TestCopy(TestCopy&& cp) {
std::cout << "Test Copy Move " << std::endl;
}
};

TestCopy TestCp() {
TestCopy tp;
return tp;
}

main 函数中调用TestCp

1
2
3
4
int main(){
TestCp();
return 0;
}

发现打印的是”Test Copy Move” .这说明优先调用的是移动构造,这也提醒我们,如果我们自定义的类实现了拷贝构造和移动构造,而这个类的移动给构造和拷贝构造实现方式不同时,要注意通过函数内部局部变量返回该类时调用移动构造是否会存在一些逻辑或安全的问题。

优先按照移动构造的方式返回局部的类对象,有一个好处就是可以返回一些只支持移动构造的类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
std::unique_ptr<int> ReturnUniquePtr() {
std::unique_ptr<int> uq_ptr = std::make_unique<int>(100);
return uq_ptr;
}

std::thread ReturnThread() {
std::thread t([]() {
int i = 0;
while (true) {
std::cout << "i is " << i << std::endl;
i++;
if (i == 5) {
break;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});

return t;
}

main函数中调用后,可以看到线程和unique_ptr都可被函数作为局部变量返回,而且返回的线程可以继续运行。

1
2
3
4
5
6
7
int main(){
auto rt_ptr = ReturnUniquePtr();
std::cout << "rt_ptr value is " << *rt_ptr << std::endl;
std::thread rt_thread = ReturnThread();
rt_thread.join();
return 0;
}

线程归属权问题

有粉丝反馈在使用thread时遇到崩溃,主要原因在于线程归属权没有理解好,我们不能将一个线程的归属权转移给一个已经绑定线程的变量。

比如下面的调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void ThreadOp() {

std::thread t1([]() {
int i = 0;
while (i < 5) {
std::this_thread::sleep_for(std::chrono::seconds(1));
i++;
}
});

std::thread t2([]() {
int i = 0;
while (i < 10) {
std::this_thread::sleep_for(std::chrono::seconds(1));
i++;
}
});

//不能将一个线程归属权绑定给一个已经绑定线程的变量,否则会触发terminate导致崩溃
t1 = std::move(t2);
t1.join();
t2.join();
}

我们在主函数中执行上述函数,会触发崩溃如下图

https://cdn.llfc.club/1697103423170.jpg

t1已经绑定了一个线程执行循环操作直到i<5。如果在t1没运行完的情况下将t2的归属权给t1,则会引发terminate崩溃错误。

具体原因我们可以看看thread在做移动赋值时的源码

1
2
3
4
5
6
7
8
thread& operator=(thread&& _Other) noexcept {
if (joinable()) {
_STD terminate();
}

_Thr = _STD exchange(_Other._Thr, {});
return *this;
}

在线程joinable()返回true时,会触发terminate()操作,也就是被赋值的线程没有被join过,此时执行operator=操作会导致terminate()。
至于terminate()实现比较简单

1
_ACRTIMP __declspec(noreturn) void __cdecl terminate() throw();

可以看到terminate()就是抛出异常。

所以我们在之前的课程封装了了自动join的线程类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class joining_thread {
std::thread _t;
public:
joining_thread() noexcept = default;
template<typename Callable, typename ... Args>
explicit joining_thread(Callable&& func, Args&& ...args):
t(std::forward<Callable>(func), std::forward<Args>(args)...){}
explicit joining_thread(std::thread t) noexcept: _t(std::move(t)){}
joining_thread(joining_thread&& other) noexcept: _t(std::move(other._t)){}
joining_thread& operator=(joining_thread&& other) noexcept
{
//如果当前线程可汇合,则汇合等待线程完成再赋值
if (joinable()) {
join();
}
_t = std::move(other._t);
return *this;
}

joining_thread& operator=(joining_thread other) noexcept
{
//如果当前线程可汇合,则汇合等待线程完成再赋值
if (joinable()) {
join();
}
_t = std::move(other._t);
return *this;
}


~joining_thread() noexcept {
if (joinable()) {
join();
}
}

void swap(joining_thread& other) noexcept {
_t.swap(other._t);
}

std::thread::id get_id() const noexcept {
return _t.get_id();
}

bool joinable() const noexcept {
return _t.joinable();
}

void join() {
_t.join();
}

void detach() {
_t.detach();
}

std::thread& as_thread() noexcept {
return _t;
}

const std::thread& as_thread() const noexcept {
return _t;
}
};

thread参数值拷贝

之前讲到构造std::thread对象传递回调函数和参数,回调函数的参数绑定都是值拷贝的方式,这里再梳理一次
下面是thread的构造函数

1
2
3
4
template <class _Fn, class... _Args, enable_if_t<!is_same_v<_Remove_cvref_t<_Fn>, thread>, int> = 0>
_NODISCARD_CTOR explicit thread(_Fn&& _Fx, _Args&&... _Ax) {
_Start(_STD forward<_Fn>(_Fx), _STD forward<_Args>(_Ax)...);
}

构造函数内调用了_Start函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    template <class _Fn, class... _Args>
void _Start(_Fn&& _Fx, _Args&&... _Ax) {
// 1 处
using _Tuple = tuple<decay_t<_Fn>, decay_t<_Args>...>;
// 2 处
auto _Decay_copied = _STD make_unique<_Tuple>(_STD forward<_Fn>(_Fx), _STD forward<_Args>(_Ax)...);
// 3 处
constexpr auto _Invoker_proc = _Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(_Args)>{});

#pragma warning(push)
#pragma warning(disable : 5039) // pointer or reference to potentially throwing function passed to
// extern C function under -EHc. Undefined behavior may occur
// if this function throws an exception. (/Wall)
// 4处
_Thr._Hnd =
reinterpret_cast<void*>(_CSTD _beginthreadex(nullptr, 0, _Invoker_proc, _Decay_copied.get(), 0, &_Thr._Id));
#pragma warning(pop)

if (_Thr._Hnd) { // ownership transferred to the thread
(void) _Decay_copied.release();
} else { // failed to start thread
_Thr._Id = 0;
_Throw_Cpp_error(_RESOURCE_UNAVAILABLE_TRY_AGAIN);
}
}

我们从上面的代码 1处 可以看到_Tuple是一个去引用的类型,因为其内部存储的都是decay_t过后的类型,所以无论左值引用还是右值引用到这里都变为去引用的类型。

所以2处就是将参数和函数按照值拷贝的方式存在tuple中。

3处定义了一个可调用对象_Invoker_proc

4处启动线程调用_Invoker_proc进而调用我们传递的回调函数和参数。

所以综上所述,std::thread向回调函数传递值是以副本的方式,回调函数参数是引用类型,可以将传递的实参用std::ref包装达到修改的效果。
因为std::ref其实是构造了reference_wrapper类对象,这个类实现了仿函数

1
2
3
_CONSTEXPR20 operator _Ty&() const noexcept {
return *_Ptr;
}

所以当线程接收std::ref包裹的参数时会调用仿函数通过指针解引用的方式获取外部实参,以_Ty&返回,从而达到修改的效果。

那么如下调用就会报错,提示“invoke”: 未找到匹配的重载函数。

1
2
3
4
5
6
7
8
void ChangeValue() {
int m = 100;
std::thread t1{ [](int& rm) {
rm++;
}, m };

t1.join();
}

因为 invoke函数调用时会将参数以右值的方式移动给回调函数,这会造成左值引用绑定右值的情况,所以编译报错。

改为下面这样写就没问题了

1
2
3
4
5
6
7
8
void ChangeValue() {
int m = 100;
std::thread t1{ [](int& rm) {
rm++;
}, std::ref(m) };

t1.join();
}

async注意事项

部分粉丝反馈async不能像js那样实现完全的纯异步,确实是存在这样的情况,因为于js不同,js是单线程的,而C++需要关注线程的生命周期。

我们使用async时,其实其内部调用了thread,pacakged_task,future等机制。async会返回一个future这个future如果会在被析构时等待其绑定的线程任务是否执行完成。

我们看一段cppreference.com中的描述

https://cdn.llfc.club/1697109536918.jpg

“The creator of the asynchronous operation can then use a variety of methods to query, wait for, or extract a value from the std::future. These methods may block if the asynchronous operation has not yet provided a value.”

异步操作async返回std::future, 调用者可以通过query,wait for等方式从std::future中查询状态。
但是如果async直接调用而不适用返回值则可能会阻塞。如下例子

1
2
3
4
5
6
7
8
9
10
void BlockAsync() {
std::cout << "begin block async" << std::endl;
{
std::async(std::launch::async, []() {
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout << "std::async called " << std::endl;
});
}
std::cout << "end block async" << std::endl;
}

我们在主函数调用BlockAsync(), 发现async并没有异步执行任务,而是按次序输出如下

1
2
3
begin block async
std::async called
end block async

因为async返回一个右值类型的future,无论左值还是右值,future都要被析构,因为其处于一个局部作用域{}中。
当编译器执行到}时会触发future析构。但是future析构要保证其关联的任务完成,所以需要等待任务完成future才被析构,
所以也就成了串行的效果了。

所以C++ 官方文档说 如果调用析构函数的那个future是某一shared state的最后持有者,而相关的task已启动但尚未结束,析构函数会造成阻塞,直到任务结束

至于为什么future析构要等到其关联的任务完成我们可以看一下async源码

1
2
3
4
5
6
7
8
9
10
11
12
13
template <class _Fty, class... _ArgTypes>
_NODISCARD future<_Invoke_result_t<decay_t<_Fty>, decay_t<_ArgTypes>...>> async(
launch _Policy, _Fty&& _Fnarg, _ArgTypes&&... _Args) {
// manages a callable object launched with supplied policy
using _Ret = _Invoke_result_t<decay_t<_Fty>, decay_t<_ArgTypes>...>;
using _Ptype = typename _P_arg_type<_Ret>::type;
//1 处
_Promise<_Ptype> _Pr(
_Get_associated_state<_Ret>(_Policy, _Fake_no_copy_callable_adapter<_Fty, _ArgTypes...>(
_STD forward<_Fty>(_Fnarg), _STD forward<_ArgTypes>(_Args)...)));
//2 处
return future<_Ret>(_Pr._Get_state_for_future(), _Nil());
}

我们先看看_Get_associated_state的源码

1
2
3
4
5
6
7
8
9
10
11
template <class _Ret, class _Fty>
_Associated_state<typename _P_arg_type<_Ret>::type>* _Get_associated_state(
launch _Psync, _Fty&& _Fnarg) { // construct associated asynchronous state object for the launch type
switch (_Psync) { // select launch type
case launch::deferred:
return new _Deferred_async_state<_Ret>(_STD forward<_Fty>(_Fnarg));
case launch::async: // TRANSITION, fixed in vMajorNext, should create a new thread here
default:
return new _Task_async_state<_Ret>(_STD forward<_Fty>(_Fnarg));
}
}

_Get_associated_state 做的事情很简单,根据我们不同的策略deferred还是async去构造不同的异步状态。如果是launch::async策略,我们创建一个
_Task_async_state类型的指针对象,我们将这个指针转化为_Associated_state指针返回,_Associated_state为_Task_async_state的基类。

async内 1处用该返回值构造了一个_Promise<_Ptype>类型的对象_Pr

async内 2处 用_Pr._Get_state_for_future()返回值构造了future,该返回值是_State_manager<_Ty>类型对象。

因为future继承于_State_manager,所以_Pr._Get_state_for_future()返回的值主要用来构造future的基类。

析构future时要析构future子类再析构其基类,future本身的析构没有什么,而其基类_State_manager<_Ty>析构时调用如下

1
2
3
4
5
~_State_manager() noexcept {
if (_Assoc_state) {
_Assoc_state->_Release();
}
}

看源码我们知道_Assoc_state 是 _Associated_state<_Ty> *类型

1
_Associated_state<_Ty>* _Assoc_state;

_Assoc_state * 就是我们之前在_Get_associated_state中开辟并返回的_Task_async_state*类型转化的。

我们沿着_Assoc_state->_Release一路追踪,会发现最终调用了下面的代码

1
2
3
4
5
6
7
void _Delete_this() { // delete this object
if (_Deleter) {
_Deleter->_Delete(this);
} else {
delete this;
}
}

如果没有删除器则会直接调用delete this, 会调用_Assoc_state的析构函数,因其析构函数为虚析构,进而调用_Task_async_state的析构函数

所以我们 ~_State_manager()调用的其实是_Task_async_state的析构函数, 我们看一下_Task_async_state的析构函数源码

1
2
3
4
5
6
7
virtual ~_Task_async_state() noexcept {
_Wait();
}

virtual void _Wait() override { // wait for completion
_Task.wait();
}

从源码中可以看到_Task_async_state 被析构时会等待任务完成,这也就是future需等待任务完成后才析构的原因。

仅仅介绍这个事情不是我得初衷,我们介绍一种更为隐晦的死锁情况, 看下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void DeadLock() {
std::mutex mtx;
std::cout << "DeadLock begin " << std::endl;
std::lock_guard<std::mutex> dklock(mtx);
{
std::future<void> futures = std::async(std::launch::async, [&mtx]() {
std::cout << "std::async called " << std::endl;
std::lock_guard<std::mutex> dklock(mtx);
std::cout << "async working...." << std::endl;
});
}

std::cout << "DeadLock end " << std::endl;
}

上面函数的作用意图在主线程中先执行加锁,再通过async启动一个线程异步执行任务,执行的任务与主线程互斥,所以在lambda表达式中加锁。
但是这么做会造成死锁,因为主线程输出”DeadLock begin “加锁,此时async启动一个线程,那么lambda表达式会先输出”std::async called “.
但是在子线程中无法加锁成功,因为主线程没有释放锁。而主线程无法释放锁,因为主线程要等待async执行完。
因为我们上面提到过,futures处于局部作用域,即将析构,而析构又要等待任务完成,任务需要加锁,所以永远完成不了,这样就死锁了。

所以使用async要注意其返回的future是不是shared state的最后持有者。

这里有个粉丝问道能不能用async实现这样的需求

  1. 你的需求是func1 中要异步执行asyncFunc函数。
  2. func2中先收集asyncFunc函数运行的结果,只有结果正确才执行
  3. func1启动异步任务后继续执行,执行完直接退出不用等到asyncFunc运行完

如果我们理解了async的原理后不难实现如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int asyncFunc() {
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout << "this is asyncFunc" << std::endl;
return 0;
}

void func1(std::future<int>& future_ref) {
std::cout << "this is func1" << std::endl;
future_ref = std::async(std::launch::async, asyncFunc);
}

void func2(std::future<int>& future_ref) {
std::cout << "this is func2" << std::endl;
auto future_res = future_ref.get();
if (future_res == 0) {
std::cout << "get asyncFunc result success !" << std::endl;
}
else {
std::cout << "get asyncFunc result failed !" << std::endl;
return;
}
}

//提供多种思路,这是第一种
void first_method() {
std::future<int> future_tmp;
func1(future_tmp);
func2(future_tmp);
}

上面的例子我们保证在func1和func2使用的是future的引用即可。这样func1内不会因为启动async而阻塞,因为future_ref不是shared state最后持有者。

如果真的想实现一个纯异步的操作倒也不难,可以这样实现

1
2
3
4
5
6
7
8
9
10
11
12
template<typename Func, typename... Args  >
auto ParallenExe(Func&& func, Args && ... args) -> std::future<decltype(func(args...))> {
typedef decltype(func(args...)) RetType;
std::function<RetType()> bind_func = std::bind(std::forward<Func>(func), std::forward<Args>(args)...);
std::packaged_task<RetType()> task(bind_func);
auto rt_future = task.get_future();
std::thread t(std::move(task));

t.detach();

return rt_future;
}

上面的函数ParallenExe内部我们通过bind操作将函数和参数绑定,生成一个返回值为RetType类型,参数为void的函数bind_func。
接着我们用这个函数生成了一个packaged_task类型的对象task,这个task获取future留作以后函数结束时返回。
我们启动了一个线程处理这个task,并将这个线程detach,保证其分离独立运行。返回的rt_future并不是shared state最后持有者,因为task内部也会持有
shared_state,引用计数并不会变为0,所以并不会触发如下析构

1
2
3
4
5
void _Release() { // decrement reference count and destroy when zero
if (_MT_DECR(_Refs) == 0) {
_Delete_this();
}
}

那么我们写一个函数测试一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void TestParallen1() {
int i = 0;
std::cout << "Begin TestParallen1 ..." << std::endl;
{
ParallenExe([](int i) {
while (i < 3) {
i++;
std::cout << "ParllenExe thread func " << i << " times" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}, i);
}

std::cout << "End TestParallen1 ..." << std::endl;
}

在上面的函数中我们有意让ParallenExe放在一个局部的{}中执行,意在局部作用域结束后ParallenExe返回的future引用计数-1,以此证明其引用计数是否为0,

如果引用计数为0,则会执行future的析构进而等待任务执行完成,那么看到的输出将是

1
2
3
4
5
Begin TestParallen1 ...
ParllenExe thread func 1 times
ParllenExe thread func 2 times
ParllenExe thread func 3 times
End TestParallen1 ...

如果引用计数不会为0,则不会执行future的析构函数,那么看到的输出是这样的

1
2
3
4
5
Begin TestParallen1 ...
End TestParallen1 ...
ParllenExe thread func 1 times
ParllenExe thread func 2 times
ParllenExe thread func 3 times

我们在main函数中调用做测试, 因为要防止主线程过早退出,所以我们先让主线程睡眠4秒

1
2
3
4
5
6
int main()
{
TestParallen1();
std::this_thread::sleep_for(std::chrono::seconds(4));
std::cout << "Main Exited!\n";
}

而事实证明是第二种,输出如下

1
2
3
4
5
Begin TestParallen1 ...
End TestParallen1 ...
ParllenExe thread func 1 times
ParllenExe thread func 2 times
ParllenExe thread func 3 times

由此可见我们实现了不会阻塞的并发函数,但是也会存在一些顾虑,比如我们的主线程如果不睡眠4秒,那很可能主线程退出了子线程的任务没执行完而被强制回收。
所以归根结底,这种方式我们也需要在合适的时候等待汇合,比如调用future的get或wait操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void TestParallen2() {
int i = 0;
std::cout << "Begin TestParallen2 ..." << std::endl;

auto rt_future = ParallenExe([](int i) {
while (i < 3) {
i++;
std::cout << "ParllenExe thread func " << i << " times" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}, i);

std::cout << "End TestParallen2 ..." << std::endl;

rt_future.wait();
}

这就是我说的,归根结底C++和js的体系不一样,C++要管理开辟的线程生命周期,我们总归要在合适的时机汇合。
所以std::async会返回future, future会判断是不是最后持有的shared_state进而帮我们做汇合操作,这并不是缺陷而是安全性的保证。至于我们不想在该处汇合,只要保证该处future不会是最后持有shared_state的即可。

总结

本文介绍了thread和async的源码级分析,回答了大家常遇到的问题。

详细源码

https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day09-QASummary

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

Actor和CSP设计模式

Posted on 2023-10-06 | In C++

简介

本文介绍两种并发设计中常用的设计模式,包括Actor和CSP模式。传统的并发设计经常都是通过共享内存加锁保证逻辑安全,这种模式有几个缺点,包括1 频繁加锁影响性能,2 耦合度高。后来大家提出了Actor和CSP设计模式。

Actor设计模式

简单点说,actor通过消息传递的方式与外界通信。消息传递是异步的。每个actor都有一个邮箱,该邮箱接收并缓存其他actor发过来的消息,actor一次只能同步处理一个消息,处理消息过程中,除了可以接收消息,不能做任何其他操作。
每一个类独立在一个线程里称作Actor,Actor之间通过队列通信,比如Actor1 发消息给Actor2, Actor2 发消息给Actor1都是投递到对方的队列中。好像给对方发邮件,对方从邮箱中取出一样。如下图

https://cdn.llfc.club/1696556496577.jpg

Actor模型的另一个好处就是可以消除共享状态,因为它每次只能处理一条消息,所以actor内部可以安全的处理状态,而不用考虑锁机制。

https://cdn.llfc.club/img_2f50c511653189037c3ac1a36c7962a2.png

我们在网络编程中对于逻辑层的处理就采用了将要处理的逻辑消息封装成包投递给逻辑队列,逻辑类从队列中消费的思想,其实就是一种Actor设计模式。Erlang是天然支持Actor的语言。

CSP模式

CSP 是 Communicating Sequential Process 的简称,中文可以叫做通信顺序进程,是一种并发编程模型,是一个很强大的并发数据模型,是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型。相对于Actor模型,CSP中channel是第一类对象,它不关注发送消息的实体,而关注与发送消息时使用的channel。go是天然支持csp模式的语言。

CSP和Actor类似,只不过CSP将消息投递给channel,至于谁从channel中取数据,发送的一方是不关注的。简单的说Actor在发送消息前是直到接收方是谁,而接受方收到消息后也知道发送方是谁,更像是邮件的通信模式。而csp是完全解耦合的。

https://cdn.llfc.club/csp.png

无论Actor还是CSP,他们都有一个共同的特性”Do not communicate by sharing memory; instead, share memory by communicating”

go风格的csp

我们通过生产者和消费者模型给大家演示csp模式的使用方式,用go来做示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
"cspdemo/message"
"fmt"
"os"
"os/signal"
"syscall"
)

var closeChan chan struct{}
var sigs chan os.Signal

func init() {
//类似于auto
sigs = make(chan os.Signal)
//具体类型初始化
closeChan = make(chan struct{})
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
//可以理解为C++ 的匿名函数,或者js的匿名函数,此处通过go原语启动一个协程并行执行
go func() {
sig := <-sigs
fmt.Println("receive signal is ", sig)
close(closeChan)
message.ConsumerInst().Exit()
message.ProducerInst().Exit()
}()
}

func main() {

fmt.Println("Main Process begin!")
<-closeChan
message.ConsumerInst().Join()
message.ProducerInst().Join()
fmt.Println("Main Process exit!")
}

在上面的代码中我们启动了一个协程监听Ctrl+C等退出操作,当收到Ctrl+C的信号后,会关闭closeChan这个channel。这样主函数中<-closeChan就会从channel中取出数据。然后等待消费者和生产者退出。

接下来我们将生产者和消费者的实现放入message包,先看下message公共数据的定义

1
2
3
4
5
package message

const MAX_COUNT = 200

var msgChan = make(chan int, MAX_COUNT)

上面的代码中我们定义了一个channel,大小为200,大家可以理解为仓库的大小为200,生产者向仓库中投递数据如果达到200就会阻塞。直到有消费者从中消费数据,如果消费者发现channel中数据为0,则阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package message

import (
"context"
"fmt"
"sync"
"time"
)

var producer *Producer = nil
var producer_once sync.Once

func init() {
// Consumer1 = new(Consumer)
//类似于C++ std::call_once
producer_once.Do(func() {
producer = new(Producer)
producer._exited = make(chan struct{})
producer._ctx, producer._cancel = context.WithCancel(context.Background())
producer.StartWork()
})
}

func ProducerInst() *Producer {
return producer
}

type Producer struct {
_exited chan struct{}
_ctx context.Context
_cancel context.CancelFunc
}

func (producer *Producer) Join() {
<-producer._exited
fmt.Println("producer exited")
}

func (producer *Producer) Exit() {
producer._cancel()
}

func (producer *Producer) StartWork() {
go func() {
i := 0
for {
i++
select {
case <-producer._ctx.Done():
{
close(producer._exited)
return
}
case msgChan <- i:
fmt.Println("producer produce number is ", i)
}

time.Sleep(50 * time.Millisecond)
}
}()
}

我们通过init函数中只调用一次的方式初始化了producer,之后生成了一个名为_exited的channel,用来通知Join返回。
同样我们还初始化了一个可以取消的context,主要是在Exit函数内调用cancel取消上下文,会触发StartWork中producer._ctx.Done()进而终止生产工作,再发出退出信号,达到优雅退出的效果。

类似的消费者也是相似逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package message

import (
"context"
"fmt"
"sync"
"time"
)

var consumer *Consumer = nil
var consumer_once sync.Once

func init() {
// Consumer1 = new(Consumer)
//类似于C++ std::call_once
consumer_once.Do(func() {
consumer = new(Consumer)
consumer._exited = make(chan struct{})
consumer._ctx, consumer._cancel = context.WithCancel(context.Background())
consumer.StartWork()
})
}

func ConsumerInst() *Consumer {
return consumer
}

type Consumer struct {
_exited chan struct{}
_ctx context.Context
_cancel context.CancelFunc
}

func (consumer *Consumer) Join() {
<-consumer._exited
fmt.Println("consumer exited")
}

func (consumer *Consumer) Exit() {
consumer._cancel()
}

func (consumer *Consumer) StartWork() {
go func() {
i := 0
for {
select {
case <-consumer._ctx.Done():
{
close(consumer._exited)
return
}
case i = <-msgChan:
fmt.Println("consumer consum number is ", i)
}

time.Sleep(100 * time.Millisecond)
}
}()
}

C++ 风格的csp

C++是万能的,我们可以用C++实现一个类似于go的channel,采用csp模式解耦合,实现类似的生产者和消费者问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>

template <typename T>
class Channel {
private:
std::queue<T> queue_;
std::mutex mtx_;
std::condition_variable cv_producer_;
std::condition_variable cv_consumer_;
size_t capacity_;
bool closed_ = false;

public:
Channel(size_t capacity = 0) : capacity_(capacity) {}

bool send(T value) {
std::unique_lock<std::mutex> lock(mtx_);
cv_producer_.wait(lock, [this]() {
// 对于无缓冲的channel,我们应该等待直到有消费者准备好
return (capacity_ == 0 && queue_.empty()) || queue_.size() < capacity_ || closed_;
});

if (closed_) {
return false;
}

queue_.push(value);
cv_consumer_.notify_one();
return true;
}

bool receive(T& value) {
std::unique_lock<std::mutex> lock(mtx_);
cv_consumer_.wait(lock, [this]() { return !queue_.empty() || closed_; });

if (closed_ && queue_.empty()) {
return false;
}

value = queue_.front();
queue_.pop();
cv_producer_.notify_one();
return true;
}

void close() {
std::unique_lock<std::mutex> lock(mtx_);
closed_ = true;
cv_producer_.notify_all();
cv_consumer_.notify_all();
}
};

// 示例使用
int main() {
Channel<int> ch(10); // 10缓冲的channel

std::thread producer([&]() {
for (int i = 0; i < 5; ++i) {
ch.send(i);
std::cout << "Sent: " << i << std::endl;
}
ch.close();
});

std::thread consumer([&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 故意延迟消费者开始消费
int val;
while (ch.receive(val)) {
std::cout << "Received: " << val << std::endl;
}
});

producer.join();
consumer.join();
return 0;
}

简单来说就是通过条件变量实现通信的阻塞和同步的。

利用csp思想实现取款逻辑

《C++并发编程实战》一书中提及了用csp思想实现atm机取款逻辑,我根据书中思想,整理了通信的示意图,书中部分代码存在问题,也一并修复了。

https://cdn.llfc.club/1696562243686.jpg

主函数实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// Actor.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//

#include <iostream>
#include "message.h"
#include "withdraw_msg.h"
#include "atm.h"
#include "dispatcher.h"
#include "bank_matchine.h"
#include "interface_matchine.h"

int main()
{
bank_machine bank;
interface_machine interface_hardware;
atm machine(bank.get_sender(), interface_hardware.get_sender());
std::thread bank_thread(&bank_machine::run, &bank);
std::thread if_thread(&interface_machine::run, &interface_hardware);
std::thread atm_thread(&atm::run, &machine);
messaging::sender atmqueue(machine.get_sender());
bool quit_pressed = false;
while (!quit_pressed)
{
char c = getchar();
switch (c)
{
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
case '8':
case '9':
atmqueue.send(digit_pressed(c));
break;
case 'b':
atmqueue.send(balance_pressed());
break;
case 'w':
atmqueue.send(withdraw_pressed(50));
break;
case 'c':
atmqueue.send(cancel_pressed());
break;
case 'q':
quit_pressed = true;
break;
case 'i':
atmqueue.send(card_inserted("acc1234"));
break;
}
}
bank.done();
machine.done();
interface_hardware.done();
atm_thread.join();
bank_thread.join();
if_thread.join();
}

主函数中启动了三个线程,分别处理bank,machine以及interface的操作。
由于代码复杂解析来只列举atm类的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#pragma once
#include "dispatcher.h"
#include <functional>
#include <iostream>
class atm
{
messaging::receiver incoming;
messaging::sender bank;
messaging::sender interface_hardware;
void (atm::* state)();
std::string account;
unsigned withdrawal_amount;
std::string pin;

void process_withdrawal()
{
incoming.wait().handle<withdraw_ok, std::function<void(withdraw_ok const& msg)>,
messaging::dispatcher >(
[&](withdraw_ok const& msg)
{
interface_hardware.send(
issue_money(withdrawal_amount));
bank.send(
withdrawal_processed(account, withdrawal_amount));
state = &atm::done_processing;
}, "withdraw_ok").handle<withdraw_denied, std::function<void(withdraw_denied const& msg)>>(
[&](withdraw_denied const& msg)
{
interface_hardware.send(display_insufficient_funds());
state = &atm::done_processing;
}, "withdraw_denied").handle<cancel_pressed, std::function<void(cancel_pressed const& msg)>>(
[&](cancel_pressed const& msg)
{
bank.send(
cancel_withdrawal(account, withdrawal_amount));
interface_hardware.send(
display_withdrawal_cancelled());
state = &atm::done_processing;
}, "cancel_pressed"
);
}
void process_balance()
{
incoming.wait()
.handle<balance, std::function<void(balance const& msg)>,
messaging::dispatcher>(
[&](balance const& msg)
{
interface_hardware.send(display_balance(msg.amount));
state = &atm::wait_for_action;
},"balance"
).handle < cancel_pressed, std::function<void(cancel_pressed const& msg) >>(
[&](cancel_pressed const& msg)
{
state = &atm::done_processing;
}, "cancel_pressed"
);
}
void wait_for_action()
{
interface_hardware.send(display_withdrawal_options());
incoming.wait()
.handle<withdraw_pressed, std::function<void(withdraw_pressed const& msg)>,
messaging::dispatcher>(
[&](withdraw_pressed const& msg)
{
withdrawal_amount = msg.amount;
bank.send(withdraw(account, msg.amount, incoming));
state = &atm::process_withdrawal;
}, "withdraw_pressed"
).handle < balance_pressed, std::function<void(balance_pressed const& msg) >>(
[&](balance_pressed const& msg)
{
bank.send(get_balance(account, incoming));
state = &atm::process_balance;
}, "balance_pressed"
).handle<cancel_pressed, std::function<void(cancel_pressed const& msg) >>(
[&](cancel_pressed const& msg)
{
state = &atm::done_processing;
}, "cancel_pressed"
);
}
void verifying_pin()
{
incoming.wait()
.handle<pin_verified, std::function<void(pin_verified const& msg)>,
messaging::dispatcher>(
[&](pin_verified const& msg)
{
state = &atm::wait_for_action;
}, "pin_verified"
).handle<pin_incorrect, std::function<void(pin_incorrect const& msg)>>(
[&](pin_incorrect const& msg)
{
interface_hardware.send(
display_pin_incorrect_message());
state = &atm::done_processing;
}, "pin_incorrect"
).handle<cancel_pressed, std::function<void(cancel_pressed const& msg)>>(
[&](cancel_pressed const& msg)
{
state = &atm::done_processing;
}, "cancel_pressed"
);
}
void getting_pin()
{
incoming.wait().handle<digit_pressed, std::function<void(digit_pressed const& msg)>,
messaging::dispatcher>(
[&](digit_pressed const& msg)
{
unsigned const pin_length = 6;
pin += msg.digit;
if (pin.length() == pin_length)
{
bank.send(verify_pin(account, pin, incoming));
state = &atm::verifying_pin;
}
}, "digit_pressed"
).handle<clear_last_pressed, std::function<void(clear_last_pressed const& msg)>>(
[&](clear_last_pressed const& msg)
{
if (!pin.empty())
{
pin.pop_back();
}
}, "clear_last_pressed"
).handle<cancel_pressed, std::function<void(cancel_pressed const& msg)>>(
[&](cancel_pressed const& msg)
{
state = &atm::done_processing;
}, "cancel_pressed"
);
}

void waiting_for_card()
{
interface_hardware.send(display_enter_card());
incoming.wait().handle<card_inserted, std::function<void(card_inserted const& msg)>,
messaging::dispatcher>(
[&](card_inserted const& msg)
{
account = msg.account;
pin = "";
interface_hardware.send(display_enter_pin());
state = &atm::getting_pin;
}, "card_inserted"
);
}
void done_processing()
{
interface_hardware.send(eject_card());
state = &atm::waiting_for_card;
}
atm(atm const&) = delete;
atm& operator=(atm const&) = delete;
public:
atm(messaging::sender bank_,
messaging::sender interface_hardware_) :
bank(bank_), interface_hardware(interface_hardware_)
{}
void done()
{
get_sender().send(messaging::close_queue());
}
void run()
{
state = &atm::waiting_for_card;
try
{
for (;;)
{
(this->*state)();
}
}
catch (messaging::close_queue const&)
{
}
}
messaging::sender get_sender()
{
return incoming;
}
};

atm 主要功能就是通过状态机不断地切换状态监听想要处理的函数。

详细源码可参考https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day08-actor/Actor

总结

本文讲述了Actor设计模式和CSP设计模式,并通过go和C++等语言给大家展示了csp并发设计的demo,最后通过讲解《C++并发编程实战》中取款的案例,展示了csp的用法。

详细源码

https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day08-actor/Actor

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

利用并行和函数式编程提高运算效率

Posted on 2023-09-24 | In C++

简介

前文介绍了async用法,很多朋友说用的不多,我对async的理解就是开辟一个一次性的线程执行并行任务,主线程可以通过future在合适的时机执行等待汇总结果。本文通过并行和函数式编程,演示快速排序提升效率的一种方式。

快速排序

快速排序(Quick Sort)是一种高效的排序算法,采用分治法的思想进行排序。以下是快速排序的基本步骤:

  1. 选择一个基准元素(pivot):从数组中选择一个元素作为基准元素。选择基准元素的方式有很多种,常见的是选择数组的第一个元素或最后一个元素。
  2. 分区(partitioning):重新排列数组,把比基准元素小的元素放在它的左边,把比基准元素大的元素放在它的右边。在这个过程结束时,基准元素就处于数组的最终位置。
  3. 递归排序子数组:递归地对基准元素左边和右边的两个子数组进行快速排序。

以下是一个基本的快速排序的C++实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//c++ 版本的快速排序算法
template<typename T>
void quick_sort_recursive(T arr[], int start, int end) {
if (start >= end) return;
T key = arr[start];
int left = start, right = end;
while(left < right) {
while (arr[right] >= key && left < right) right--;
while (arr[left] <= key && left < right) left++;
std::swap(arr[left], arr[right]);
}

if (arr[left] < key) {
std::swap(arr[left], arr[start]);
}


quick_sort_recursive(arr, start, left - 1);
quick_sort_recursive(arr, left + 1, end);
}

template<typename T>
void quick_sort(T arr[], int len) {
quick_sort_recursive(arr, 0, len - 1);
}

排序演示

假设一开始序列{xi}是:5,3,7,6,4,1,0,2,9,10,8。

此时,ref=5,i=1,j=11,从后往前找,第一个比5小的数是x8=2,因此序列为:2,3,7,6,4,1,0,5,9,10,8。

此时i=1,j=8,从前往后找,第一个比5大的数是x3=7,因此序列为:2,3,5,6,4,1,0,7,9,10,8。

此时,i=3,j=8,从第8位往前找,第一个比5小的数是x7=0,因此:2,3,0,6,4,1,5,7,9,10,8。

此时,i=3,j=7,从第3位往后找,第一个比5大的数是x4=6,因此:2,3,0,5,4,1,6,7,9,10,8。

此时,i=4,j=7,从第7位往前找,第一个比5小的数是x6=1,因此:2,3,0,1,4,5,6,7,9,10,8。

此时,i=4,j=6,从第4位往后找,直到第6位才有比5大的数,这时,i=j=6,ref成为一条分界线,它之前的数都比它小,之后的数都比它大,对于前后两部分数,可以采用同样的方法来排序。

调用比较简单

1
2
3
4
5
6
7
8
9
10
11
12
void test_quick_sort() {

int num_arr[] = { 5,3,7,6,4,1,0,2,9,10,8 };
int length = sizeof(num_arr) / sizeof(int);
quick_sort(num_arr, length );
std::cout << "sorted result is ";
for (int i = 0; i < length; i++) {
std::cout << " " << num_arr[i];
}

std::cout << std::endl;
}

这种实现方式比较依赖存储数据的数据结构,比如上面是通过数组存储的,那如果我想实现list容器中元素的排序怎么办?我既不想关注存储的容器,也不想关注存储的类型,想实现一套通用的比较规则?那就需要函数式编程来解决

函数式编程

C++函数式编程是一种编程范式,它将计算视为数学上的函数求值,并避免改变状态和使用可变数据。在函数式编程中,程序是由一系列函数组成的,每个函数都接受输入并产生输出,而且没有任何副作用。

在C++中,函数式编程可以使用函数指针、函数对象(functor)和lambda表达式等机制来实现。这些机制允许您编写可以像普通函数一样调用的代码块,并将它们作为参数传递给其他函数或作为返回值返回。

C++11引入了一些新功能,如constexpr函数和表达式模板,这些功能使得在C++中进行函数式编程更加容易和直观。

我们用函数式编程修改上面的快速排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
template<typename T>
std::list<T> sequential_quick_sort(std::list<T> input)
{
if (input.empty())
{
return input;
}
std::list<T> result;

// ① 将input中的第一个元素放入result中,并且将这第一个元素从input中删除
result.splice(result.begin(), input, input.begin());

// ② 取result的第一个元素,将来用这个元素做切割,切割input中的列表。
T const& pivot = *result.begin();

// ③std::partition 是一个标准库函数,用于将容器或数组中的元素按照指定的条件进行分区,
// 使得满足条件的元素排在不满足条件的元素之前。
// 所以经过计算divide_point指向的是input中第一个大于等于pivot的元素
auto divide_point = std::partition(input.begin(), input.end(),
[&](T const& t) {return t < pivot; });

// ④ 我们将小于pivot的元素放入lower_part中
std::list<T> lower_part;
lower_part.splice(lower_part.end(), input, input.begin(),
divide_point);

// ⑤我们将lower_part传递给sequential_quick_sort 返回一个新的有序的从小到大的序列
//lower_part 中都是小于divide_point的值
auto new_lower(
sequential_quick_sort(std::move(lower_part)));
// ⑥我们剩余的input列表传递给sequential_quick_sort递归调用,input中都是大于divide_point的值。
auto new_higher(
sequential_quick_sort(std::move(input)));
//⑦到此时new_higher和new_lower都是从小到大排序好的列表
//将new_higher 拼接到result的尾部
result.splice(result.end(), new_higher);
//将new_lower 拼接到result的头部
result.splice(result.begin(), new_lower);
return result;
}

用如下方式调用

1
2
3
4
5
6
7
8
9
void test_sequential_quick() {
std::list<int> numlists = { 6,1,0,7,5,2,9,-1 };
auto sort_result = sequential_quick_sort(numlists);
std::cout << "sorted result is ";
for (auto iter = sort_result.begin(); iter != sort_result.end(); iter++) {
std::cout << " " << (*iter);
}
std::cout << std::endl;
}

这个函数是一个使用快速排序算法对链表进行排序的实现。快速排序是一种常用的排序算法,它的基本思想是选择一个基准元素,然后将数组分为两部分,一部分是小于基准元素的元素,另一部分是大于基准元素的元素。然后对这两部分再分别进行快速排序。这个函数使用了C++的模板,可以处理任何数据类型的链表。函数的主要步骤包括:

  1. 将链表的第一个元素作为基准元素,并将其从链表中删除。

  2. 使用std::partition函数将链表分为两部分,一部分是小于基准元素的元素,另一部分是大于或等于基准元素的元素。

  3. 对这两部分分别进行递归排序。\n4. 将排序后的两部分和基准元素合并,返回排序后的链表。

并行方式

我们提供并行方式的函数式编程,可以极大的利用cpu多核的优势,这在并行计算中很常见。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//并行版本
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
if (input.empty())
{
return input;
}
std::list<T> result;
result.splice(result.begin(), input, input.begin());
T const& pivot = *result.begin();
auto divide_point = std::partition(input.begin(), input.end(),
[&](T const& t) {return t < pivot; });
std::list<T> lower_part;
lower_part.splice(lower_part.end(), input, input.begin(),
divide_point);
// ①因为lower_part是副本,所以并行操作不会引发逻辑错误,这里可以启动future做排序
std::future<std::list<T>> new_lower(
std::async(&parallel_quick_sort<T>, std::move(lower_part)));

// ②
auto new_higher(
parallel_quick_sort(std::move(input)));
result.splice(result.end(), new_higher);
result.splice(result.begin(), new_lower.get());
return result;
}

测试调用如下

1
2
3
4
5
6
7
8
9
void test_sequential_quick() {
std::list<int> numlists = { 6,1,0,7,5,2,9,-1 };
auto sort_result = sequential_quick_sort(numlists);
std::cout << "sorted result is ";
for (auto iter = sort_result.begin(); iter != sort_result.end(); iter++) {
std::cout << " " << (*iter);
}
std::cout << std::endl;
}

我们对lower_part的排序调用了std::async并行处理。而higher_part则是串行执行的。这么做提高了计算的并行能力,但有人会问如果数组的大小为1024,那么就是2的10次方,需要启动10个线程执行,这仅是对一个1024大小的数组的排序,如果有多个数组排序,开辟线程会不会很多?其实不用担心这个,因为async的实现方式在上一节中已经提及了,是通过std::launch::async或者std::launch::deffered完成的。编译器会计算当前能否开辟线程,如果能则使用std::launch::async模式开辟线程,如果不能则采用std::launch::deffered串行执行。当然,我们也可以通过我们上文提及的线程池方式实现并行计算

ThreadPool方式的并行排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//线程池版本
//并行版本
template<typename T>
std::list<T> thread_pool_quick_sort(std::list<T> input)
{
if (input.empty())
{
return input;
}
std::list<T> result;
result.splice(result.begin(), input, input.begin());
T const& pivot = *result.begin();
auto divide_point = std::partition(input.begin(), input.end(),
[&](T const& t) {return t < pivot; });
std::list<T> lower_part;
lower_part.splice(lower_part.end(), input, input.begin(),
divide_point);
// ①因为lower_part是副本,所以并行操作不会引发逻辑错误,这里投递给线程池处理
auto new_lower = ThreadPool::commit(&parallel_quick_sort<T>, std::move(lower_part));
// ②
auto new_higher(
parallel_quick_sort(std::move(input)));
result.splice(result.end(), new_higher);
result.splice(result.begin(), new_lower.get());
return result;
}

通过如下方式测试

1
2
3
4
5
6
7
8
9
void test_thread_pool_sort() {
std::list<int> numlists = { 6,1,0,7,5,2,9,-1 };
auto sort_result = thread_pool_quick_sort(numlists);
std::cout << "sorted result is ";
for (auto iter = sort_result.begin(); iter != sort_result.end(); iter++) {
std::cout << " " << (*iter);
}
std::cout << std::endl;
}

到此我们实现了多种版本的快速排序,并不是鼓励读者造轮子,而是提供一种并行处理的思想,相信读者在后续的工作中在合适的时机采用并行处理的方式,可以极大的提高程序处理问题的效率。

总结

本文介绍了如何使用future, promise以及async用法

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

源码链接

https://gitee.com/secondtonone1/boostasio-learn

C++ 并发三剑客future, promise和async

Posted on 2023-09-17 | In C++

简介

本文介绍C++ 并发三剑客, future, promise以及async用法。这三个类是实现并发技术的关键,接下来详细介绍一下

async用法

std::async 是一个用于异步执行函数的模板函数,它返回一个 std::future 对象,该对象用于获取函数的返回值。

以下是一个使用 std::async 的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <iostream>
#include <future>
#include <chrono>

// 定义一个异步任务
std::string fetchDataFromDB(std::string query) {
// 模拟一个异步任务,比如从数据库中获取数据
std::this_thread::sleep_for(std::chrono::seconds(5));
return "Data: " + query;
}

int main() {
// 使用 std::async 异步调用 fetchDataFromDB
std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data");

// 在主线程中做其他事情
std::cout << "Doing something else..." << std::endl;

// 从 future 对象中获取数据
std::string dbData = resultFromDB.get();
std::cout << dbData << std::endl;

return 0;
}

在这个示例中,std::async 创建了一个新的线程(或从内部线程池中挑选一个线程)并自动与一个 std::promise 对象相关联。std::promise 对象被传递给 fetchDataFromDB 函数,函数的返回值被存储在 std::future 对象中。在主线程中,我们可以使用 std::future::get 方法从 std::future 对象中获取数据。注意,在使用 std::async 的情况下,我们必须使用 std::launch::async 标志来明确表明我们希望函数异步执行。

上面的例子输出

1
2
Doing something else...
Data: Data

async的启动策略

std::async函数可以接受几个不同的启动策略,这些策略在std::launch枚举中定义。除了std::launch::async之外,还有以下启动策略:

  1. std::launch::deferred:这种策略意味着任务将在调用std::future::get()或std::future::wait()函数时延迟执行。换句话说,任务将在需要结果时同步执行。
  2. std::launch::async | std::launch::deferred:这种策略是上面两个策略的组合。任务可以在一个单独的线程上异步执行,也可以延迟执行,具体取决于实现。

默认情况下,std::async使用std::launch::async | std::launch::deferred策略。这意味着任务可能异步执行,也可能延迟执行,具体取决于实现。需要注意的是,不同的编译器和操作系统可能会有不同的默认行为。

future的wait和get

std::future::get() 和 std::future::wait() 是 C++ 中用于处理异步任务的两个方法,它们的功能和用法有一些重要的区别。

  1. std::future::get():

std::future::get() 是一个阻塞调用,用于获取 std::future 对象表示的值或异常。如果异步任务还没有完成,get() 会阻塞当前线程,直到任务完成。如果任务已经完成,get() 会立即返回任务的结果。重要的是,get() 只能调用一次,因为它会移动或消耗掉 std::future 对象的状态。一旦 get() 被调用,std::future 对象就不能再被用来获取结果。
2. std::future::wait():

std::future::wait() 也是一个阻塞调用,但它与 get() 的主要区别在于 wait() 不会返回任务的结果。它只是等待异步任务完成。如果任务已经完成,wait() 会立即返回。如果任务还没有完成,wait() 会阻塞当前线程,直到任务完成。与 get() 不同,wait() 可以被多次调用,它不会消耗掉 std::future 对象的状态。

总结一下,这两个方法的主要区别在于:

  • std::future::get() 用于获取并返回任务的结果,而 std::future::wait() 只是等待任务完成。
  • get() 只能调用一次,而 wait() 可以被多次调用。
  • 如果任务还没有完成,get() 和 wait() 都会阻塞当前线程,但 get() 会一直阻塞直到任务完成并返回结果,而 wait() 只是在等待任务完成。

你可以使用std::future的wait_for()或wait_until()方法来检查异步操作是否已完成。这些方法返回一个表示操作状态的std::future_status值。

1
2
3
4
5
if(fut.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {  
// 操作已完成
} else {
// 操作尚未完成
}

将任务和future关联

std::packaged_task和std::future是C++11中引入的两个类,它们用于处理异步任务的结果。

std::packaged_task是一个可调用目标,它包装了一个任务,该任务可以在另一个线程上运行。它可以捕获任务的返回值或异常,并将其存储在std::future对象中,以便以后使用。

std::future代表一个异步操作的结果。它可以用于从异步任务中获取返回值或异常。

以下是使用std::packaged_task和std::future的基本步骤:

  1. 创建一个std::packaged_task对象,该对象包装了要执行的任务。
  2. 调用std::packaged_task对象的get_future()方法,该方法返回一个与任务关联的std::future对象。
  3. 在另一个线程上调用std::packaged_task对象的operator(),以执行任务。
  4. 在需要任务结果的地方,调用与任务关联的std::future对象的get()方法,以获取任务的返回值或异常。

以下是一个简单的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int my_task() {
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << "my task run 5 s" << std::endl;
return 42;
}

void use_package() {
// 创建一个包装了任务的 std::packaged_task 对象
std::packaged_task<int()> task(my_task);

// 获取与任务关联的 std::future 对象
std::future<int> result = task.get_future();

// 在另一个线程上执行任务
std::thread t(std::move(task));
t.detach(); // 将线程与主线程分离,以便主线程可以等待任务完成

// 等待任务完成并获取结果
int value = result.get();
std::cout << "The result is: " << value << std::endl;

}

在上面的示例中,我们创建了一个包装了任务的std::packaged_task对象,并获取了与任务关联的std::future对象。然后,我们在另一个线程上执行任务,并等待任务完成并获取结果。最后,我们输出结果。

我们可以使用 std::function 和 std::package_task 来包装带参数的函数。std::package_task 是一个模板类,它包装了一个可调用对象,并允许我们将其作为异步任务传递。

promise 用法

C++11引入了std::promise和std::future两个类,用于实现异步编程。std::promise用于在某一线程中设置某个值或异常,而std::future则用于在另一线程中获取这个值或异常。

下面是std::promise的基本用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
#include <thread>
#include <future>

void set_value(std::promise<int> prom) {
// 设置 promise 的值
prom.set_value(10);
}

int main() {
// 创建一个 promise 对象
std::promise<int> prom;
// 获取与 promise 相关联的 future 对象
std::future<int> fut = prom.get_future();
// 在新线程中设置 promise 的值
std::thread t(set_value, std::move(prom));
// 在主线程中获取 future 的值
std::cout << "Waiting for the thread to set the value...\n";
std::cout << "Value set by the thread: " << fut.get() << '\n';
t.join();
return 0;
}

程序输出

1
2
3
Waiting for the thread to set the value...
promise set value successValue set by the thread:
10

在上面的代码中,我们首先创建了一个std::promise<int>对象,然后通过调用get_future()方法获取与之相关联的std::future<int>对象。然后,我们在新线程中通过调用set_value()方法设置promise的值,并在主线程中通过调用fut.get()方法获取这个值。注意,在调用fut.get()方法时,如果promise的值还没有被设置,则该方法会阻塞当前线程,直到值被设置为止。

除了set_value()方法外,std::promise还有一个set_exception()方法,用于设置异常。该方法接受一个std::exception_ptr参数,该参数可以通过调用std::current_exception()方法获取。下面是一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <iostream>
#include <thread>
#include <future>

void set_exception(std::promise<void> prom) {
try {
// 抛出一个异常
throw std::runtime_error("An error occurred!");
} catch(...) {
// 设置 promise 的异常
prom.set_exception(std::current_exception());
}
}

int main() {
// 创建一个 promise 对象
std::promise<void> prom;
// 获取与 promise 相关联的 future 对象
std::future<void> fut = prom.get_future();
// 在新线程中设置 promise 的异常
std::thread t(set_exception, std::move(prom));
// 在主线程中获取 future 的异常
try {
std::cout << "Waiting for the thread to set the exception...\n";
fut.get();
} catch(const std::exception& e) {
std::cout << "Exception set by the thread: " << e.what() << '\n';
}
t.join();
return 0;
}

上述代码输出

1
2
Waiting for the thread to set the exception...
Exception set by the thread: An error occurred!

当然我们使用promise时要注意一点,如果promise被释放了,而其他的线程还未使用与promise关联的future,当其使用这个future时会报错。如下是一段错误展示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void use_promise_destruct() {
std::thread t;
std::future<int> fut;
{
// 创建一个 promise 对象
std::promise<int> prom;
// 获取与 promise 相关联的 future 对象
fut = prom.get_future();
// 在新线程中设置 promise 的值
t = std::thread(set_value, std::move(prom));
}
// 在主线程中获取 future 的值
std::cout << "Waiting for the thread to set the value...\n";
std::cout << "Value set by the thread: " << fut.get() << '\n';
t.join();
}

随着局部作用域}的结束,prom可能被释放也可能会被延迟释放,
如果立即释放则fut.get()获取的值会报error_value的错误。

共享类型的future

当我们需要多个线程等待同一个执行结果时,需要使用std::shared_future

以下是一个适合使用std::shared_future的场景,多个线程等待同一个异步操作的结果:

假设你有一个异步任务,需要多个线程等待其完成,然后这些线程需要访问任务的结果。在这种情况下,你可以使用std::shared_future来共享异步任务的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void myFunction(std::promise<int>&& promise) {
// 模拟一些工作
std::this_thread::sleep_for(std::chrono::seconds(1));
promise.set_value(42); // 设置 promise 的值
}

void threadFunction(std::shared_future<int> future) {
try {
int result = future.get();
std::cout << "Result: " << result << std::endl;
}
catch (const std::future_error& e) {
std::cout << "Future error: " << e.what() << std::endl;
}
}

void use_shared_future() {
std::promise<int> promise;
std::shared_future<int> future = promise.get_future();

std::thread myThread1(myFunction, std::move(promise)); // 将 promise 移动到线程中

// 使用 share() 方法获取新的 shared_future 对象

std::thread myThread2(threadFunction, future);

std::thread myThread3(threadFunction, future);

myThread1.join();
myThread2.join();
myThread3.join();
}

在这个示例中,我们创建了一个std::promise<int>对象promise和一个与之关联的std::shared_future<int>对象future。然后,我们将promise对象移动到另一个线程myThread1中,该线程将执行myFunction函数,并在完成后设置promise的值。我们还创建了两个线程myThread2和myThread3,它们将等待future对象的结果。如果myThread1成功地设置了promise的值,那么future.get()将返回该值。这些线程可以同时访问和等待future对象的结果,而不会相互干扰。

但是大家要注意,如果一个future被移动给两个shared_future是错误的。

1
2
3
4
5
6
7
8
9
10
11
12
13
void use_shared_future() {
std::promise<int> promise;
std::shared_future<int> future = promise.get_future();

std::thread myThread1(myFunction, std::move(promise)); // 将 promise 移动到线程中

std::thread myThread2(threadFunction, std::move(future));
std::thread myThread3(threadFunction, std::move(future));

myThread1.join();
myThread2.join();
myThread3.join();
}

这种用法是错误的,一个future通过隐式构造传递给shared_future之后,这个shared_future被移动传递给两个线程是不合理的,因为第一次移动后shared_future的生命周期被转移了,接下来myThread3构造时用的std::move(future)future已经失效了,会报错,一般都是no state 之类的错误。

异常处理

std::future 是C++的一个模板类,它用于表示一个可能还没有准备好的异步操作的结果。你可以通过调用 std::future::get 方法来获取这个结果。如果在获取结果时发生了异常,那么 std::future::get 会重新抛出这个异常。

以下是一个例子,演示了如何在 std::future 中获取异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>
#include <future>
#include <stdexcept>
#include <thread>

void may_throw()
{
// 这里我们抛出一个异常。在实际的程序中,这可能在任何地方发生。
throw std::runtime_error("Oops, something went wrong!");
}

int main()
{
// 创建一个异步任务
std::future<void> result(std::async(std::launch::async, may_throw));

try
{
// 获取结果(如果在获取结果时发生了异常,那么会重新抛出这个异常)
result.get();
}
catch (const std::exception &e)
{
// 捕获并打印异常
std::cerr << "Caught exception: " << e.what() << std::endl;
}

return 0;
}

在这个例子中,我们创建了一个异步任务 may_throw,这个任务会抛出一个异常。然后,我们创建一个 std::future 对象 result 来表示这个任务的结果。在 main 函数中,我们调用 result.get() 来获取任务的结果。如果在获取结果时发生了异常,那么 result.get() 会重新抛出这个异常,然后我们在 catch 块中捕获并打印这个异常。

上面的例子输出

1
Caught exception: Oops, something went wrong!

线程池

我们可以利用上面提到的std::packaged_task和std::promise构建线程池,提高程序的并发能力。
先了解什么是线程池:

线程池是一种多线程处理形式,它处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

线程池可以避免在处理短时间任务时创建与销毁线程的代价,它维护着多个线程,等待着监督管理者分配可并发执行的任务,从而提高了整体性能。

下面是我提供的一套线程池源码,目前用在公司的项目中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__

#include <atomic>
#include <condition_variable>
#include <future>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

class ThreadPool {
public:
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;

static ThreadPool& instance() {
static ThreadPool ins;
return ins;
}

using Task = std::packaged_task<void()>;


~ThreadPool() {
stop();
}

template <class F, class... Args>
auto commit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
using RetType = decltype(f(args...));
if (stop_.load())
return std::future<RetType>{};

auto task = std::make_shared<std::packaged_task<RetType()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));

std::future<RetType> ret = task->get_future();
{
std::lock_guard<std::mutex> cv_mt(cv_mt_);
tasks_.emplace([task] { (*task)(); });
}
cv_lock_.notify_one();
return ret;
}

int idleThreadCount() {
return thread_num_;
}

private:
ThreadPool(unsigned int num = 5)
: stop_(false) {
{
if (num < 1)
thread_num_ = 1;
else
thread_num_ = num;
}
start();
}
void start() {
for (int i = 0; i < thread_num_; ++i) {
pool_.emplace_back([this]() {
while (!this->stop_.load()) {
Task task;
{
std::unique_lock<std::mutex> cv_mt(cv_mt_);
this->cv_lock_.wait(cv_mt, [this] {
return this->stop_.load() || !this->tasks_.empty();
});
if (this->tasks_.empty())
return;

task = std::move(this->tasks_.front());
this->tasks_.pop();
}
this->thread_num_--;
task();
this->thread_num_++;
}
});
}
}
void stop() {
stop_.store(true);
cv_lock_.notify_all();
for (auto& td : pool_) {
if (td.joinable()) {
std::cout << "join thread " << td.get_id() << std::endl;
td.join();
}
}
}

private:
std::mutex cv_mt_;
std::condition_variable cv_lock_;
std::atomic_bool stop_;
std::atomic_int thread_num_;
std::queue<Task> tasks_;
std::vector<std::thread> pool_;
};

#endif // !__THREAD_POOL_H__

总结

本文介绍了如何使用future, promise以及async用法

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

源码链接

https://gitee.com/secondtonone1/boostasio-learn

利用条件变量构造线程安全队列

Posted on 2023-09-09 | In C++

简介

本文介绍如何使用条件变量控制并发的同步操作,试想有一个线程A一直输出1,另一个线程B一直输出2。我想让两个线程交替输出1,2,1,2…之类的效果,该如何实现?有的同学可能会说不是有互斥量mutex吗?可以用一个全局变量num表示应该哪个线程输出,比如num为1则线程A输出1,num为2则线程B输出2,mutex控制两个线程访问num,如果num和线程不匹配,就让该线程睡一会,这不就实现了吗?比如线程A加锁后发现当前num为2则表示它不能输出1,就解锁,将锁的使用权交给线程A,线程B就sleep一会。

不良实现

上面说的方式可以实现我们需要的功能,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void PoorImpleman() {
std::thread t1([]() {
for (;;) {

{
std::lock_guard<std::mutex> lock(mtx_num);
if (num == 1) {
std::cout << "thread A print 1....." << std::endl;
num++;
continue;
}
}

std::this_thread::sleep_for(std::chrono::milliseconds(500));

}

});

std::thread t2([]() {
for (;;) {

{
std::lock_guard<std::mutex> lock(mtx_num);
if (num == 2) {
std::cout << "thread B print 2....." << std::endl;
num--;
continue;
}
}

std::this_thread::sleep_for(std::chrono::milliseconds(500));

}

});

t1.join();
t2.join();
}

PoorImpleman虽然能实现我们交替打印的功能,会造成消息处理的不及时,因为线程A要循环检测num值,如果num不为1,则线程A就睡眠了,在线程A睡眠这段时间很可能B已经处理完打印了,此时A还在睡眠,是对资源的浪费,也错过了最佳的处理时机。所以我们提出了用条件变量来通知线程的机制,当线程A发现条件不满足时可以挂起,等待线程B通知,线程B通知A后,A被唤醒继续处理。

条件变量

我们这里用条件变量实现上面的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void ResonableImplemention() {
std::thread t1([]() {
for (;;) {

std::unique_lock<std::mutex> lock(mtx_num);
cvA.wait(lock, []() {
return num == 1;
});

num++;
std::cout << "thread A print 1....." << std::endl;
cvB.notify_one();
}

});

std::thread t2([]() {
for (;;) {

std::unique_lock<std::mutex> lock(mtx_num);
cvB.wait(lock, []() {
return num == 2;
});

num--;
std::cout << "thread B print 2....." << std::endl;
cvA.notify_one();
}

});

t1.join();
t2.join();
}

当条件不满足时(num 不等于1 时)cvA.wait就会挂起,等待线程B通知通知线程A唤醒,线程B采用的是cvA.notifyone。
这么做的好处就是线程交替处理非常及时。比起sleep的方式,我们可以从控制台上看出差异效果,sleep的方式看出日志基本是每隔1秒才打印一次,效率不高。

线程安全队列

之前我们实现过线程安全的栈,对于pop操作,我们如果在线程中调用empty判断是否为空,如果不为空,则pop,因为empty和pop内部分别加锁,是两个原子操作,导致pop时可能会因为其他线程提前pop导致队列为空,从而引发崩溃。我们当时的处理方式是实现了两个版本的pop,一种是返回智能指针类型,一种通过参数为引用的方式返回。对于智能指针版本我们发现队列为空则返回空指针,对于引用版本,
发现队列为空则抛出异常,这么做并不是很友好,所以我们可以通过条件变量完善之前的程序,不过这次我们重新实现一个线程安全队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <queue>
#include <mutex>
#include <condition_variable>
template<typename T>
class threadsafe_queue
{
private:
std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}
};

threadsafe_queue<data_chunk> data_queue;
void data_preparation_thread()
{
while(more_data_to_prepare())
{
data_chunk const data=prepare_data();
data_queue.push(data); ⇽--- ②
}
}
void data_processing_thread()
{
while(true)
{
data_chunk data;
data_queue.wait_and_pop(data);
process(data);
if(is_last_chunk(data))
break;
}
}

我们可以启动三个线程,一个producer线程用来向队列中放入数据。一个consumer1线程用来阻塞等待pop队列中的元素。

另一个consumer2尝试从队列中pop元素,如果队列为空则直接返回,如果非空则pop元素。

打印时为了保证线程输出在屏幕上不会乱掉,所以加了锁保证互斥输出

测试代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
void test_safe_que() {
threadsafe_queue<int> safe_que;
std::mutex mtx_print;
std::thread producer(
[&]() {
for (int i = 0; ;i++) {
safe_que.push(i);
{
std::lock_guard<std::mutex> printlk(mtx_print);
std::cout << "producer push data is " << i << std::endl;
}

std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
);

std::thread consumer1(
[&]() {
for (;;) {
auto data = safe_que.wait_and_pop();
{
std::lock_guard<std::mutex> printlk(mtx_print);
std::cout << "consumer1 wait and pop data is " << *data << std::endl;
}

std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
);

std::thread consumer2(
[&]() {
for (;;) {
auto data = safe_que.try_pop();
if (data != nullptr) {
{
std::lock_guard<std::mutex> printlk(mtx_print);
std::cout << "consumer2 try_pop data is " << *data << std::endl;
}

}

std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
);

producer.join();
consumer1.join();
consumer2.join();
}

测试效果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
producer push data is 0
consumer1 wait and pop data is 0
producer push data is 1
producer push data is 2
consumer2 try_pop data is 1
consumer1 wait and pop data is 2
producer push data is 3
producer push data is 4
consumer2 try_pop data is 3
consumer1 wait and pop data is 4
producer push data is 5
producer push data is 6
producer push data is 7
consumer2 try_pop data is 5
consumer1 wait and pop data is 6

我们能看到consumer1和consumer2是并发消费的

总结

本文介绍了如何通过条件变量实现并发线程的同步处理。

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

源码链接

https://gitee.com/secondtonone1/boostasio-learn

线程安全的单例模式

Posted on 2023-09-03 | In C++

简介

本文介绍C++ 线程安全的单例模式如何实现,通过介绍单例模式的演变历程,给读者更完备的实现单例模式的方案。

局部静态变量

我们知道当一个函数中定义一个局部静态变量,那么这个局部静态变量只会初始化一次,就是在这个函数第一次调用的时候,以后无论调用几次这个函数,函数内的局部静态变量都不再初始化。
那我们可以利用局部静态变量这一特点实现单例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Single2 {
private:
Single2()
{
}
Single2(const Single2&) = delete;
Single2& operator=(const Single2&) = delete;
public:
static Single2& GetInst()
{
static Single2 single;
return single;
}
};

上述版本的单例模式在C++11 以前存在多线程不安全的情况,编译器可能会初始化多个静态变量。
但是C++11推出以后,各厂商优化编译器,能保证线程安全。所以为了保证运行安全请确保使用C++11以上的标准。

饿汉式初始化

在C++11 推出以前,局部静态变量的方式实现单例存在线程安全问题,所以部分人推出了一种方案,就是在主线程启动后,其他线程没有启动前,由主线程先初始化单例资源,这样其他线程获取的资源就不涉及重复初始化的情况了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//饿汉式
class Single2Hungry
{
private:
Single2Hungry()
{
}
Single2Hungry(const Single2Hungry&) = delete;
Single2Hungry& operator=(const Single2Hungry&) = delete;
public:
static Single2Hungry* GetInst()
{
if (single == nullptr)
{
single = new Single2Hungry();
}
return single;
}
private:
static Single2Hungry* single;
};

调用如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//饿汉式初始化
Single2Hungry* Single2Hungry::single = Single2Hungry::GetInst();
void thread_func_s2(int i)
{
std::cout << "this is thread " << i << std::endl;
std::cout << "inst is " << Single2Hungry::GetInst() << std::endl;
}
void test_single2hungry()
{
std::cout << "s1 addr is " << Single2Hungry::GetInst() << std::endl;
std::cout << "s2 addr is " << Single2Hungry::GetInst() << std::endl;
for (int i = 0; i < 3; i++)
{
std::thread tid(thread_func_s2, i);
tid.join();
}
}

饿汉式是从使用角度规避多线程的安全问题,很多情况下我们很难从规则角度限制开发人员,所以这种方式不是很推荐。

懒汉式初始化

很多人觉得什么时候调用初始化是用户的权利,不应该加以限制,所以就有了懒汉式方式初始化资源,在用到时如果没有初始化单例则初始化,如果初始化了则直接使用.
所以这种方式我们要加锁,防止资源被重复初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class SinglePointer
{
private:
SinglePointer()
{
}
SinglePointer(const SinglePointer&) = delete;
SinglePointer& operator=(const SinglePointer&) = delete;
public:
static SinglePointer* GetInst()
{
if (single != nullptr)
{
return single;
}
s_mutex.lock();
if (single != nullptr)
{
s_mutex.unlock();
return single;
}
single = new SinglePointer();
s_mutex.unlock();
return single;
}
private:
static SinglePointer* single;
static std::mutex s_mutex;
};

调用如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SinglePointer* SinglePointer::single = nullptr;
std::mutex SinglePointer::s_mutex;
void thread_func_lazy(int i)
{
std::cout << "this is lazy thread " << i << std::endl;
std::cout << "inst is " << SinglePointer::GetInst() << std::endl;
}
void test_singlelazy()
{
for (int i = 0; i < 3; i++)
{
std::thread tid(thread_func_lazy, i);
tid.join();
}
//何时释放new的对象?造成内存泄漏
}

这种方式存在一个很严重的问题,就是当多个线程都调用单例函数时,我们不确定资源是被哪个线程初始化的。
回收指针存在问题,存在多重释放或者不知道哪个指针释放的问题。

智能指针

我们能想到一个自动初始化资源并且自动释放的方式就是智能指针。利用智能指针自动回收资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//可以利用智能指针完成自动回收

class SingleAuto
{
private:
SingleAuto()
{
}
SingleAuto(const SingleAuto&) = delete;
SingleAuto& operator=(const SingleAuto&) = delete;
public:
~SingleAuto()
{
std::cout << "single auto delete success " << std::endl;
}
static std::shared_ptr<SingleAuto> GetInst()
{
if (single != nullptr)
{
return single;
}
s_mutex.lock();
if (single != nullptr)
{
s_mutex.unlock();
return single;
}
single = std::shared_ptr<SingleAuto>(new SingleAuto);
s_mutex.unlock();
return single;
}
private:
static std::shared_ptr<SingleAuto> single;
static std::mutex s_mutex;
};

调用方式如下

1
2
3
4
5
6
7
8
9
10
11
std::shared_ptr<SingleAuto> SingleAuto::single = nullptr;
std::mutex SingleAuto::s_mutex;
void test_singleauto()
{
auto sp1 = SingleAuto::GetInst();
auto sp2 = SingleAuto::GetInst();
std::cout << "sp1 is " << sp1 << std::endl;
std::cout << "sp2 is " << sp2 << std::endl;
//此时存在隐患,可以手动删除裸指针,造成崩溃
// delete sp1.get();
}

这样开辟的资源交给智能指针管理免去了回收资源的麻烦。
但是有些人觉得虽然智能指针能自动回收内存,如果有开发人员手动delete指针怎么办?
所以有人提出了利用辅助类帮助智能指针释放资源,将智能指针的析构设置为私有。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//为了规避用户手动释放内存,可以提供一个辅助类帮忙回收内存
//并将单例类的析构函数写为私有

class SingleAutoSafe;
class SafeDeletor
{
public:
void operator()(SingleAutoSafe* sf)
{
std::cout << "this is safe deleter operator()" << std::endl;
delete sf;
}
};
class SingleAutoSafe
{
private:
SingleAutoSafe() {}
~SingleAutoSafe()
{
std::cout << "this is single auto safe deletor" << std::endl;
}
SingleAutoSafe(const SingleAutoSafe&) = delete;
SingleAutoSafe& operator=(const SingleAutoSafe&) = delete;
//定义友元类,通过友元类调用该类析构函数
friend class SafeDeletor;
public:
static std::shared_ptr<SingleAutoSafe> GetInst()
{
//1处
if (single != nullptr)
{
return single;
}
s_mutex.lock();
//2处
if (single != nullptr)
{
s_mutex.unlock();
return single;
}
//额外指定删除器
//3 处
single = std::shared_ptr<SingleAutoSafe>(new SingleAutoSafe, SafeDeletor());
//也可以指定删除函数
// single = std::shared_ptr<SingleAutoSafe>(new SingleAutoSafe, SafeDelFunc);
s_mutex.unlock();
return single;
}
private:
static std::shared_ptr<SingleAutoSafe> single;
static std::mutex s_mutex;
};

SafeDeletor就是删除的辅助类,实现了仿函数。构造智能指针时指定了SafeDeletor对象,这样就能帮助智能指针释放了。

但是上面的代码存在危险,比如懒汉式的使用方式,当多个线程调用单例时,有一个线程加锁进入3处的逻辑。
其他的线程有的在1处,判断指针非空则跳过初始化直接使用单例的内存会存在问题。
主要原因在于SingleAutoSafe * temp = new SingleAutoSafe() 这个操作是由三部分组成的
1 调用allocate开辟内存
2 调用construct执行SingleAutoSafe的构造函数
3 调用赋值操作将地址赋值给temp

而现实中2和3的步骤可能颠倒,所以有可能在一些编译器中通过优化是1,3,2的调用顺序,
其他线程取到的指针就是非空,还没来的及调用构造函数就交给外部使用造成不可预知错误。
为解决这个问题,C++11 推出了std::call_once函数保证多个线程只执行一次

call_once

C++11 提出了call_once函数,我们可以配合一个局部的静态变量once_flag实现线程安全的初始化。
多线程调用call_once函数时,会判断once_flag是否被初始化,如没被初始化则进入初始化流程,调用我们提供的初始化函数。
但是同一时刻只有一个线程能进入这个初始化函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class SingletonOnce {
private:
SingletonOnce() = default;
SingletonOnce(const SingletonOnce&) = delete;
SingletonOnce& operator = (const SingletonOnce& st) = delete;
static std::shared_ptr<SingletonOnce> _instance;

public :
static std::shared_ptr<SingletonOnce> GetInstance() {
static std::once_flag s_flag;
std::call_once(s_flag, [&]() {
_instance = std::shared_ptr<SingletonOnce>(new SingletonOnce);
});

return _instance;
}

void PrintAddress() {
std::cout << _instance.get() << std::endl;
}

~SingletonOnce() {
std::cout << "this is singleton destruct" << std::endl;
}
};

std::shared_ptr<SingletonOnce> SingletonOnce::_instance = nullptr;

调用方式如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void TestSingle() {

std::thread t1([]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
SingletonOnce::GetInstance()->PrintAddress();
});

std::thread t2([]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
SingletonOnce::GetInstance()->PrintAddress();
});

t1.join();
t2.join();
}

为了使用单例类更通用,比如项目中使用多个单例类,可以通过继承实现多个单例类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//为了让单例更加通用,可以做成模板类
template <typename T>
class Singleton {
protected:
Singleton() = default;
Singleton(const Singleton<T>&) = delete;
Singleton& operator=(const Singleton<T>& st) = delete;
static std::shared_ptr<T> _instance;
public:
static std::shared_ptr<T> GetInstance() {
static std::once_flag s_flag;
std::call_once(s_flag, [&]() {
_instance = std::shared_ptr<T>(new T);
});
return _instance;
}
void PrintAddress() {
std::cout << _instance.get() << std::endl;
}
~Singleton() {
std::cout << "this is singleton destruct" << std::endl;
}
};
template <typename T>
std::shared_ptr<T> Singleton<T>::_instance = nullptr;

比如我们想实现单例类,就像我们之前在网络编程中介绍的那样,可以通过继承实现单例模式

1
2
3
4
5
6
7
8
9
//想使用单例类,可以继承上面的模板,我们在网络编程中逻辑单例类用的就是这种方式
class LogicSystem :public Singleton<LogicSystem>
{
friend class Singleton<LogicSystem>;
public:
~LogicSystem(){}
private:
LogicSystem(){}
};

总结

如果你只是实现一个简单的单例类推荐使用返回局部静态变量的方式
如果想大规模实现多个单例类可以用call_once实现的模板类。

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

源码链接

https://gitee.com/secondtonone1/boostasio-learn

C++ 并发(4) unique_lock,读写锁以及递归锁

Posted on 2023-08-31 | In C++

简介

本文介绍C++ 并发中使用的其他类型的锁,包括unique_lock,shared_lock, 以及recursive_lock等。shared_lock和unique_lock比较常用,而recursive_lock用的不多,或尽可能规避用这种锁。

unique_lock

unique_lock和lock_guard基本用法相同,构造时默认加锁,析构时默认解锁,但unique_lock有个好处就是可以手动解锁。这一点尤为重要,方便我们控制锁住区域的粒度(加锁的范围大小),也能支持和条件变量配套使用,至于条件变量我们之后再介绍,本文主要介绍锁的相关操作。

1
2
3
4
5
6
7
8
9
10
//unique_lock 基本用法
std::mutex mtx;
int shared_data = 0;
void use_unique() {
//lock可自动解锁,也可手动解锁
std::unique_lock<std::mutex> lock(mtx);
std::cout << "lock success" << std::endl;
shared_data++;
lock.unlock();
}

我们可以通过unique_lock的owns_lock判断是否持有锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//可判断是否占有锁
void owns_lock() {
//lock可自动解锁,也可手动解锁
std::unique_lock<std::mutex> lock(mtx);
shared_data++;
if (lock.owns_lock()) {
std::cout << "owns lock" << std::endl;
}
else {
std::cout << "doesn't own lock" << std::endl;
}

lock.unlock();
if (lock.owns_lock()) {
std::cout << "owns lock" << std::endl;
}
else {
std::cout << "doesn't own lock" << std::endl;
}
}

上述代码输出

1
2
owns lock
doesn't own lock

unique_lock可以延迟加锁

1
2
3
4
5
6
7
8
9
 //可以延迟加锁
void defer_lock() {
//延迟加锁
std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
//可以加锁
lock.lock();
//可以自动析构解锁,也可以手动解锁
lock.unlock();
}

那我们写一段代码综合运用owns_lock和defer_lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//同时使用owns和defer
void use_own_defer() {
std::unique_lock<std::mutex> lock(mtx);
// 判断是否拥有锁
if (lock.owns_lock())
{
std::cout << "Main thread has the lock." << std::endl;
}
else
{
std::cout << "Main thread does not have the lock." << std::endl;
}

std::thread t([]() {
std::unique_lock<std::mutex> lock(mtx, std::defer_lock);

// 判断是否拥有锁
if (lock.owns_lock())
{
std::cout << "Thread has the lock." << std::endl;
}
else
{
std::cout << "Thread does not have the lock." << std::endl;
}

// 加锁
lock.lock();

// 判断是否拥有锁
if (lock.owns_lock())
{
std::cout << "Thread has the lock." << std::endl;
}
else
{
std::cout << "Thread does not have the lock." << std::endl;
}

// 解锁
lock.unlock();
});
t.join();
}

上述代码回依次输出, 但是程序会阻塞,因为子线程会卡在加锁的逻辑上,因为主线程未释放锁,而主线程又等待子线程退出,导致整个程序卡住。

1
2
Main thread has the lock.
Thread does not have the lock.

和lock_guard一样,unique_lock也支持领养锁

1
2
3
4
5
6
7
8
9
10
11
12
//同样支持领养操作
void use_own_adopt() {
mtx.lock();
std::unique_lock<std::mutex> lock(mtx, std::adopt_lock);
if (lock.owns_lock()) {
std::cout << "owns lock" << std::endl;
}
else {
std::cout << "does not have the lock" << std::endl;
}
lock.unlock();
}

尽管是领养的,但是打印还是会出现owns lock,因为不管如何锁被加上,就会输出owns lock。

既然unique_lock支持领养操作也支持延迟加锁,那么可以用两种方式实现前文lock_guard实现的swap操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//之前的交换代码可以可以用如下方式等价实现
int a = 10;
int b = 99;
std::mutex mtx1;
std::mutex mtx2;

void safe_swap() {
std::lock(mtx1, mtx2);
std::unique_lock<std::mutex> lock1(mtx1, std::adopt_lock);
std::unique_lock<std::mutex> lock2(mtx2, std::adopt_lock);
std::swap(a, b);
//错误用法
//mtx1.unlock();
//mtx2.unlock();
}

void safe_swap2() {

std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock);
std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock);
//需用lock1,lock2加锁
std::lock(lock1, lock2);
//错误用法
//std::lock(mtx1, mtx2);
std::swap(a, b);
}

大家注意一旦mutex被unique_lock管理,加锁和释放的操作就交给unique_lock,不能调用mutex加锁和解锁,因为锁的使用权已经交给unique_lock了。

我们知道mutex是不支持移动和拷贝的,但是unique_lock支持移动,当一个mutex被转移给unique_lock后,可以通过unique_ptr转移其归属权.

1
2
3
4
5
6
7
8
9
10
11
12
//转移互斥量所有权
//互斥量本身不支持move操作,但是unique_lock支持
std::unique_lock <std::mutex> get_lock() {
std::unique_lock<std::mutex> lock(mtx);
shared_data++;
return lock;
}

void use_return() {
std::unique_lock<std::mutex> lock(get_lock());
shared_data++;
}

锁的粒度表示加锁的精细程度,一个锁的粒度要足够大,保证可以锁住要访问的共享数据。

同时一个锁的粒度要足够小,保证非共享数据不被锁住影响性能。

而unique_ptr则很好的支持手动解锁。

1
2
3
4
5
6
7
8
9
void precision_lock() {
std::unique_lock<std::mutex> lock(mtx);
shared_data++;
lock.unlock();
//不设计共享数据的耗时操作不要放在锁内执行
std::this_thread::sleep_for(std::chrono::seconds(1));
lock.lock();
shared_data++;
}

共享锁

试想这样一个场景,对于一个DNS服务,我们可以根据域名查询服务对应的ip地址,它很久才更新一次,比如新增记录,删除记录或者更新记录等。平时大部分时间都是提供给外部查询,对于查询操作,即使多个线程并发查询不加锁也不会有问题,但是当有线程修改DNS服务的ip记录或者增减记录时,其他线程不能查询,需等待修改完再查询。或者等待查询完,线程才能修改。也就是说读操作并不是互斥的,同一时间可以有多个线程同时读,但是写和读是互斥的,写与写是互斥的,简而言之,写操作需要独占锁。而读操作需要共享锁。

要想使用共享锁,需使用共享互斥量std::shared_mutex,std::shared_mutex是C++17标准提出的。
C++14标准可以使用std::shared_time_mutex,

std::shared_mutex 和 std::shared_timed_mutex 都是用于实现多线程并发访问共享数据的互斥锁,但它们之间存在一些区别:

  1. std::shared_mutex:
* 提供了 `lock()`, `try_lock()`, 和 `try_lock_for()` 以及 `try_lock_until()` 函数,这些函数都可以用于获取互斥锁。
* 提供了 `try_lock_shared()` 和 `lock_shared()` 函数,这些函数可以用于获取共享锁。
* 当 `std::shared_mutex` 被锁定后,其他尝试获取该锁的线程将会被阻塞,直到该锁被解锁。
  1. std::shared_timed_mutex:
* 与 `std::shared_mutex` 类似,也提供了 `lock()`, `try_lock()`, 和 `try_lock_for()` 以及 `try_lock_until()` 函数用于获取互斥锁。
* 与 `std::shared_mutex` 不同的是,它还提供了 `try_lock_shared()` 和 `lock_shared()` 函数用于获取共享锁,这些函数在尝试获取共享锁时具有超时机制。
* 当 `std::shared_timed_mutex` 被锁定后,其他尝试获取该锁的线程将会被阻塞,直到该锁被解锁,这与 `std::shared_mutex` 相同。然而,当尝试获取共享锁时,如果不能立即获得锁,`std::shared_timed_mutex` 会设置一个超时,超时过后如果仍然没有获取到锁,则操作将返回失败。

因此,std::shared_timed_mutex 提供了额外的超时机制,这使得它在某些情况下更适合于需要处理超时的并发控制。然而,如果不需要超时机制,可以使用更简单的 std::shared_mutex。

C++11标准没有共享互斥量,可以使用boost提供的boost::shared_mutex。

如果我们想构造共享锁,可以使用std::shared_lock,如果我们想构造独占锁, 可以使用std::lock_gurad.

我们用一个类DNService代表DNS服务,查询操作使用共享锁,而写操作使用独占锁,可以是如下方式的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class DNService {
public:
DNService() {}
//读操作采用共享锁
std::string QueryDNS(std::string dnsname) {
std::shared_lock<std::shared_mutex> shared_locks(_shared_mtx);
auto iter = _dns_info.find(dnsname);
if (iter != _dns_info.end()) {
return iter->second;
}

return "";
}

//写操作采用独占锁
void AddDNSInfo(std::string dnsname, std::string dnsentry) {
std::lock_guard<std::shared_mutex> guard_locks(_shared_mtx);
_dns_info.insert(std::make_pair(dnsname, dnsentry));
}
private:
std::map<std::string, std::string> _dns_info;
mutable std::shared_mutex _shared_mtx;
};

QueryDNS 用来查询dns信息,多个线程可同时访问。
AddDNSInfo 用来添加dns信息,属独占锁,同一时刻只有一个线程在修改。

递归锁

有时候我们在实现接口的时候内部加锁,接口内部调用完结束自动解锁。会出现一个接口调用另一个接口的情况,如果用普通的std::mutex就会出现卡死,因为嵌套加锁导致卡死。但是我们可以使用递归锁。

但我个人并不推荐递归锁,可以从设计源头规避嵌套加锁的情况,我们可以将接口相同的功能抽象出来,统一加锁。下面的设计演示了如何使用递归锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class RecursiveDemo {
public:
RecursiveDemo() {}
bool QueryStudent(std::string name) {
std::lock_guard<std::recursive_mutex> recursive_lock(_recursive_mtx);
auto iter_find = _students_info.find(name);
if (iter_find == _students_info.end()) {
return false;
}

return true;
}

void AddScore(std::string name, int score) {
std::lock_guard<std::recursive_mutex> recursive_lock(_recursive_mtx);
if (!QueryStudent(name)) {
_students_info.insert(std::make_pair(name, score));
return;
}

_students_info[name] = _students_info[name] + score;
}

//不推荐采用递归锁,使用递归锁说明设计思路并不理想,需优化设计
//推荐拆分逻辑,将共有逻辑拆分为统一接口
void AddScoreAtomic(std::string name, int score) {
std::lock_guard<std::recursive_mutex> recursive_lock(_recursive_mtx);
auto iter_find = _students_info.find(name);
if (iter_find == _students_info.end()) {
_students_info.insert(std::make_pair(name, score));
return;
}

_students_info[name] = _students_info[name] + score;
return;
}
private:

std::map<std::string, int> _students_info;
std::recursive_mutex _recursive_mtx;
};

我们可以看到AddScore函数内部调用了QueryStudent, 所以采用了递归锁。

但是我们同样可以改变设计,将两者公有的部分抽离出来生成一个新的接口AddScoreAtomic.

AddScoreAtomic可以不适用递归锁,照样能完成线程安全操作的目的。

总结

本文介绍了unique_lock,共享锁,递归锁等的使用,较为全面的介绍了这几种锁的使用场景和潜在风险。

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

源码链接

https://gitee.com/secondtonone1/boostasio-learn

<1…8910…37>

370 posts
17 categories
21 tags
RSS
GitHub ZhiHu
© 2025 恋恋风辰 本站总访问量次 | 本站访客数人
Powered by Hexo
|
Theme — NexT.Muse v5.1.3