C++ 并发(2) 线程管控

简介

本节介绍C++线程管控,包括移交线程的归属权,线程并发数量控制以及获取线程id等基本操作。

线程归属权

我们之前介绍了线程可以通过detach在后台运行或者让开辟这个线程的父线程等待该线程完成。
但每个线程都应该有其归属权,也就是归属给某个变量管理。比如

1
2
3
4
5
void some_function() {

}

std::thread t1(some_function);

t1是一个线程变量,管理一个线程,该线程执行some_function()
对于std::thread C++ 不允许其执行拷贝构造和拷贝赋值, 所以只能通过移动和局部变量返回的方式将线程变量管理的线程转移给其他变量管理。
C++ 中类似的类型还有std::mutex, std::ifstream, std::unique_ptr
比如下面,我们说明了线程归属权的转移方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void some_function() {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}

void some_other_function() {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}

//t1 绑定some_function
std::thread t1(some_function);
//2 转移t1管理的线程给t2,转移后t1无效
std::thread t2 = std::move(t1);
//3 t1 可继续绑定其他线程,执行some_other_function
t1 = std::thread(some_other_function);
//4 创建一个线程变量t3
std::thread t3;
//5 转移t2管理的线程给t3
t3 = std::move(t2);
//6 转移t3管理的线程给t1
t1 = std::move(t3);
std::this_thread::sleep_for(std::chrono::seconds(2000));

上面的代码会引发崩溃,是因为步骤6造成的崩溃。
让主函数睡眠2000秒,是为了告诉规避主函数退出引发崩溃的问题,因为我们在之前给大家演示过,如果线程不detach或者join,主线程退出时会引发崩溃,而我们这些线程没有join和detach,为了给大家演示是因为步骤6引发的崩溃,所以让主线程睡眠2000秒暂时不退出,但是程序仍然会崩溃,说明是步骤6导致的崩溃。

上面代码将t2管理的线程交给t3
之后将t3管理的线程交给t1,此时t1管理线程运行着 some_function
步骤6导致崩溃的原因就是将t3管理的线程交给t1,而此时t1正在管理线程运行some_other_function
所以我们可以得出一个结论,就是不要将一个线程的管理权交给一个已经绑定线程的变量,否则会触发线程的terminate函数引发崩溃。

std::unique_ptr一样,我们可以在函数内部返回一个局部的std::thread变量。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
std::thread  f() {
return std::thread(some_function);
}

void param_function(int a) {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}

std::thread g() {
std::thread t(param_function, 43);
}

因为C++ 在返回局部变量时,会优先寻找这个类的拷贝构造函数,如果没有就会使用这个类的移动构造函数。

joining_thread

曾经有一份C++17标准的备选提案,主张引入新的类joining_thread,它与std::thread类似,但只要其执行析构函数,线程即能自动汇合,这点与scoped_thread非常像。可惜C++标准委员会未能达成共识,结果C++17标准没有引入这个类,后来它改名为std::jthread,依然进入了C++20标准的议程(现已被正式纳入C++20标准)。除去这些,实际上joining_thread类的代码相对容易编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class joining_thread {
std::thread _t;
public:
joining_thread() noexcept = default;
template<typename Callable, typename ... Args>
explicit joining_thread(Callable&& func, Args&& ...args):
t(std::forward<Callable>(func), std::forward<Args>(args)...){}
explicit joining_thread(std::thread t) noexcept: _t(std::move(t)){}
joining_thread(joining_thread&& other) noexcept: _t(std::move(other._t)){}
joining_thread& operator=(joining_thread&& other) noexcept
{
//如果当前线程可汇合,则汇合等待线程完成再赋值
if (joinable()) {
join();
}
_t = std::move(other._t);
return *this;
}

joining_thread& operator=(joining_thread other) noexcept
{
//如果当前线程可汇合,则汇合等待线程完成再赋值
if (joinable()) {
join();
}
_t = std::move(other._t);
return *this;
}


~joining_thread() noexcept {
if (joinable()) {
join();
}
}

void swap(joining_thread& other) noexcept {
_t.swap(other._t);
}

std::thread::id get_id() const noexcept {
return _t.get_id();
}

bool joinable() const noexcept {
return _t.joinable();
}

void join() {
_t.join();
}

void detach() {
_t.detach();
}

std::thread& as_thread() noexcept {
return _t;
}

const std::thread& as_thread() const noexcept {
return _t;
}
};

使用起来比较简单,我们直接构造一个joining_thread对象即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void use_jointhread() {
//1 根据线程构造函数构造joiningthread
joining_thread j1([](int maxindex) {
for (int i = 0; i < maxindex; i++) {
std::cout << "in thread id " << std::this_thread::get_id()
<< " cur index is " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}, 10);

//2 根据thread构造joiningthread
joining_thread j2(std::thread([](int maxindex) {
for (int i = 0; i < maxindex; i++) {
std::cout << "in thread id " << std::this_thread::get_id()
<< " cur index is " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}, 10));

//3 根据thread构造j3
joining_thread j3(std::thread([](int maxindex) {
for (int i = 0; i < maxindex; i++) {
std::cout << "in thread id " << std::this_thread::get_id()
<< " cur index is " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}, 10));


//4 把j3赋值给j1,joining_thread内部会等待j1汇合结束后
//再将j3赋值给j1
j1 = std::move(j3);
ervice
}

容器存储

容器存储线程时,比如vector,如果用push_back操作势必会调用std::thread,这样会引发编译错误,因为std::thread没有拷贝构造函数。我们在之前网络编程实现IOServicePool或者IOThreadPool时初始化了多个线程存储在vector中, 采用的时emplace方式,可以直接根据线程构造函数需要的参数构造,这样就避免了调用thread的拷贝构造函数。 类似于这种

1
2
3
4
5
6
7
8
9
10
void use_vector() {
std::vector<std::thread> threads;
for (unsigned i = 0; i < 10; ++i) {
threads.emplace_back(param_function, i);
}

for (auto& entry : threads) {
entry.join();
}
}

选择运行数量

借用C++标准库的std::thread::hardware_concurrency()函数,它的返回值是一个指标,表示程序在各次运行中可真正并发的线程数量.
我们可以模拟实现一个并行计算的功能,计算容器内所有元素的和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
unsigned long const length = std::distance(first, last);
if (!length)
return init; //⇽-- - ①
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread - 1) / min_per_thread; //⇽-- - ②
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads); //⇽-- - ③
unsigned long const block_size = length / num_threads; //⇽-- - ④
std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads - 1); // ⇽-- - ⑤
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i)
{
Iterator block_end = block_start;
std::advance(block_end, block_size); //⇽-- - ⑥
threads[i] = std::thread(//⇽-- - ⑦
accumulate_block<Iterator, T>(),
block_start, block_end, std::ref(results[i]));
block_start = block_end; //⇽-- - ⑧
}
accumulate_block<Iterator, T>()(
block_start, last, results[num_threads - 1]); //⇽-- - ⑨

for (auto& entry : threads)
entry.join(); //⇽-- - ⑩
return std::accumulate(results.begin(), results.end(), init); //⇽-- - ⑪
}

void use_parallel_acc() {
std::vector <int> vec(10000);
for (int i = 0; i < 10000; i++) {
vec.push_back(i);
}
int sum = 0;
sum = parallel_accumulate<std::vector<int>::iterator, int>(vec.begin(),
vec.end(), sum);

std::cout << "sum is " << sum << std::endl;
}

上面的代码1处判断要计算的容器内元素为0个则返回。

2处计算最大开辟的线程数,我们预估每个线程计算25个数据长度。

但是我们可以通过std::thread::hardware_concurrency返回cpu的核数,我们期待的是开辟的线

程数小于等于cpu核数,这样才不会造成线程过多时间片切换开销。

所以3处计算了适合开辟线程数的最小值。

4处计算了步长,根据步长移动迭代器然后开辟线程计算。

5处初始化了线程数-1个大小的vector,因为主线程也参与计算,所以这里-1.

6处移动步长,7处开辟线程,8处更新起始位置。

9处为主线程计算。

10 处让所有线程join

11 处最后将所有计算结果再次调用std的accumulate算出结果。

识别线程

所谓识别线程就是获取线程id,可以根据线程id是否相同判断是否同一个线程。
比如我们启动了一个线程,我们可以通过线程变量的get_id()获取线程id

1
2
3
4
5
std::thread t([](){
std::cout << "thread start" << std::endl;
});

t.get_id();

但是如果我们想在线程的运行函数中区分线程,或者判断哪些是主线程或者子线程,可以通过这总方式

1
2
3
4
5
std::thread t([](){
std::cout << "in thread id " <<
std::this_thread::get_id() << std::endl;
std::cout << "thread start" << std::endl;
});

总结

本文介绍了线程管控相关的知识,包括线程的join,detach,以及识别线程,归属权转移,如何管理等等。

本文介绍了std::thread的基本操作,具体视频可以去B站看看我的C++视频讲解

https://space.bilibili.com/271469206/channel/collectiondetail?sid=313101&ctype=0

代码链接

源码链接https://gitee.com/secondtonone1/boostasio-learn