C++ 并发(1) 线程基础

简介

本文主要介绍线程的基本管控,包括线程的发起,等待,异常条件下如何等待以及后台运行等基础操作。

线程发起

线程发起顾名思义就是启动一个线程,C++11标准统一了线程操作,可以在定义一个线程变量后,该变量启动线程执行回调逻辑。如下即可发起一个线程

1
2
3
4
5
6
void thead_work1(std::string str) {
std::cout << "str is " << str << std::endl;
}

//1 通过()初始化并启动一个线程
std::thread t1(thead_work1, hellostr);

线程等待

当我们启动一个线程后,线程可能没有立即执行,如果在局部作用域启动了一个线程,或者main函数中,很可能子线程没运行就被回收了,回收时会调用线程的析构函数,执行terminate操作。所以为了防止主线程退出或者局部作用域结束导致子线程被析构的情况,我们可以通过join,让主线程等待子线程启动运行,子线程运行结束后主线程再运行。

1
2
3
4
5
std::string hellostr = "hello world!";
//1 通过()初始化并启动一个线程
std::thread t1(thead_work1, hellostr);
//2 主线程等待子线程退出
t1.join();

仿函数作为参数

当我们用仿函数作为参数传递给线程时,也可以达到启动线程执行某种操作的含义

1
2
3
4
5
6
class background_task {
public:
void operator()(std::string str) {
std::cout << "str is " << str << std::endl;
}
};

如果采用如下方式启动函数,那一定会报错的。

1
2
std::thread t2(background_task());
t2.join();

因为编译器会将t2当成一个函数对象, 返回一个std::thread类型的值, 函数的参数为一个函数指针,该函数指针返回值为background_task, 参数为void。可以理解为如下

1
"std::thread (*)(background_task (*)())"

修改的方式很简单

1
2
3
4
5
6
7
//可多加一层()
std::thread t2((background_task()));
t2.join();

//可使用{}方式初始化
std::thread t3{ background_task() };
t3.join();

lambda表达式

lambda 表达式也可以作为线程的参数传递给thread

1
2
3
4
5
6

std::thread t4([](std::string str) {
std::cout << "str is " << str << std::endl;
}, hellostr);

t4.join();

线程detach

线程允许采用分离的方式在后台独自运行,C++ concurrent programing书中称其为守护线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

struct func {
int& _i;
func(int & i): _i(i){}
void operator()() {
for (int i = 0; i < 3; i++) {
_i = i;
std::cout << "_i is " << _i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
};

void oops() {

int some_local_state = 0;
func myfunc(some_local_state);
std::thread functhread(myfunc);
//隐患,访问局部变量,局部变量可能会随着}结束而回收或随着主线程退出而回收
functhread.detach();
}

// detach 注意事项
oops();
//防止主线程退出过快,需要停顿一下,让子线程跑起来detach
std::this_thread::sleep_for(std::chrono::seconds(1));

上面的例子存在隐患,因为some_local_state是局部变量, 当oops调用结束后局部变量some_local_state就可能被释放了,而线程还在detach后台运行,容易出现崩溃。
所以当我们在线程中使用局部变量的时候可以采取几个措施解决局部变量的问题

  • 通过智能指针传递参数,因为引用计数会随着赋值增加,可保证局部变量在使用期间不被释放,这也就是我们之前提到的伪闭包策略。
  • 将局部变量的值作为参数传递,这么做需要局部变量有拷贝复制的功能,而且拷贝耗费空间和效率。
  • 将线程运行的方式修改为join,这样能保证局部变量被释放前线程已经运行结束。但是这么做可能会影响运行逻辑。
    比如下面的修改
    1
    2
    3
    4
    5
    6
    7
    8
    9
    void use_join() {
    int some_local_state = 0;
    func myfunc(some_local_state);
    std::thread functhread(myfunc);
    functhread.join();
    }

    // join 用法
    use_join();

    异常处理

当我们启动一个线程后,如果主线程产生崩溃,会导致子线程也会异常退出,就是调用terminate,如果子线程在进行一些重要的操作比如将充值信息入库等,丢失这些信息是很危险的。所以常用的做法是捕获异常,并且在异常情况下保证子线程稳定运行结束后,主线程抛出异常结束运行。如下面的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void catch_exception() {
int some_local_state = 0;
func myfunc(some_local_state);
std::thread functhread{ myfunc };
try {
//本线程做一些事情,可能引发崩溃
std::this_thread::sleep_for(std::chrono::seconds(1));
}catch (std::exception& e) {
functhread.join();
throw;
}

functhread.join();
}

但是用这种方式编码,会显得臃肿,可以采用RAII技术,保证线程对象析构的时候等待线程运行结束,回收资源。如果大家还记得我基于asio实现异步服务时,逻辑处理类LogicSystem的析构函数里等待线程退出。那我们写一个简单的线程守卫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class thread_guard {
private:
std::thread& _t;
public:
explicit thread_guard(std::thread& t):_t(t){}
~thread_guard() {
//join只能调用一次
if (_t.joinable()) {
_t.join();
}
}

thread_guard(thread_guard const&) = delete;
thread_guard& operator=(thread_guard const&) = delete;
};

可以这么使用

1
2
3
4
5
6
7
8
9
10
void auto_guard() {
int some_local_state = 0;
func my_func(some_local_state);
std::thread t(my_func);
thread_guard g(t);
//本线程做一些事情
std::cout << "auto guard finished " << std::endl;
}

auto_guard();

慎用隐式转换

C++中会有一些隐式转换,比如char* 转换为string等。这些隐式转换在线程的调用上可能会造成崩溃问题

1
2
3
4
5
6
7
8
9
10
void danger_oops(int som_param) {
char buffer[1024];
sprintf(buffer, "%i", som_param);
//在线程内部将char const* 转化为std::string
//指针常量 char const* 指针本身不能变
//常量指针 const char * 指向的内容不能变
std::thread t(print_str, 3, buffer);
t.detach();
std::cout << "danger oops finished " << std::endl;
}

当我们定义一个线程变量thread t时,传递给这个线程的参数buffer会被保存到thread的成员变量中。
而在线程对象t内部启动并运行线程时,参数才会被传递给调用函数print_str
而此时buffer可能随着}运行结束而释放了。
改进的方式很简单,我们将参数传递给thread时显示转换为string就可以了,
这样thread内部保存的是string类型。

1
2
3
4
5
6
void safe_oops(int some_param) {
char buffer[1024];
sprintf(buffer, "%i", some_param);
std::thread t(print_str, 3, std::string(buffer));
t.detach();
}

关于为什么参数会像我说的这样保存和调用,我在之后会按照源码给大家讲一讲。

引用参数

当线程要调用的回调函数参数为引用类型时,需要将参数显示转化为引用对象传递给线程的构造函数,
如果采用如下调用会编译失败

1
2
3
4
5
6
7
8
9
10
11
12

void change_param(int& param) {
param++;
}

void ref_oops(int some_param) {
std::cout << "before change , param is " << some_param << std::endl;
//需使用引用显示转换
std::thread t2(change_param, some_param);
t2.join();
std::cout << "after change , param is " << some_param << std::endl;
}

即使函数change_param的参数为int&类型,我们传递给t2的构造函数为some_param,也不会达到在change_param函数内部修改关联到外部some_param的效果。因为some_param在传递给thread的构造函数后会转变为右值保存,右值传递给一个左值引用会出问题,所以编译出了问题。
改为如下调用就可以了

1
2
3
4
5
6
7
void ref_oops(int some_param) {
std::cout << "before change , param is " << some_param << std::endl;
//需使用引用显示转换
std::thread t2(change_param, std::ref(some_param));
t2.join();
std::cout << "after change , param is " << some_param << std::endl;
}

thread原理

为了详细说明thread参数传递和调用原理,我们看看源码

1
2
3
4
template <class _Fn, class... _Args, enable_if_t<!is_same_v<_Remove_cvref_t<_Fn>, thread>, int> = 0>
_NODISCARD_CTOR explicit thread(_Fn&& _Fx, _Args&&... _Ax) {
_Start(_STD forward<_Fn>(_Fx), _STD forward<_Args>(_Ax)...);
}

thread构造函数内部通过forward原样转换传递给_Start函数。关于原样转换的知识可以看我之前写的文章。
_Start 函数内部就是启动了一个线程_beginthreadex执行回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
template <class _Fn, class... _Args>
void _Start(_Fn&& _Fx, _Args&&... _Ax) {
using _Tuple = tuple<decay_t<_Fn>, decay_t<_Args>...>;
auto _Decay_copied = _STD make_unique<_Tuple>(_STD forward<_Fn>(_Fx), _STD forward<_Args>(_Ax)...);
constexpr auto _Invoker_proc = _Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(_Args)>{});

#pragma warning(push)
#pragma warning(disable : 5039) // pointer or reference to potentially throwing function passed to
// extern C function under -EHc. Undefined behavior may occur
// if this function throws an exception. (/Wall)
_Thr._Hnd =
reinterpret_cast<void*>(_CSTD _beginthreadex(nullptr, 0, _Invoker_proc, _Decay_copied.get(), 0, &_Thr._Id));
#pragma warning(pop)

if (_Thr._Hnd) { // ownership transferred to the thread
(void) _Decay_copied.release();
} else { // failed to start thread
_Thr._Id = 0;
_Throw_Cpp_error(_RESOURCE_UNAVAILABLE_TRY_AGAIN);
}
}

我们对应ref_oops内部函数的调用,_Start的参数就是void _Start(change_param&& _Fx, int&& Ax)

_beginthreadex函数参数分别是安全参数,栈大小,调用函数地址,调用函数参数,初始标记,第三方参数地址。
我们关注_beginthreadex的调用函数和参数,调用函数为_Invoker_proc,参数为_Decay_copied
我们看看这两个变量的定义

1
2
auto _Decay_copied           = _STD make_unique<_Tuple>(_STD forward<_Fn>(_Fx), _STD forward<_Args>
constexpr auto _Invoker_proc = _Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(_Args)>{});

_Decay_copied 可以理解为

1
auto _Decay_copied           = _STD make_unique<_Tuple>(_STD forward<change_param>(_Fx), _STD forward<int>

_Invoker_proc 可以理解为封装的可调用对象

1
constexpr auto _Invoker_proc = _Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(int)>{});

我们做一个简化

1
constexpr auto _Invoker_proc = _Get_invoke<_Tuple>(make_index_sequence<2>{});

其实就是将参数的索引0,1按照序列传递给_Get_invoke
_Get_invoke原型为

1
2
3
_NODISCARD static constexpr auto _Get_invoke(index_sequence<_Indices...>) noexcept {
return &_Invoke<_Tuple, _Indices...>;
}

所以_Get_invoke函数原型为

1
2
3
4
5
6
7
8
9
template <class _Tuple, size_t... _Indices>
static unsigned int __stdcall _Invoke(void* _RawVals) noexcept /* terminates */ {
// adapt invoke of user's callable object to _beginthreadex's thread procedure
const unique_ptr<_Tuple> _FnVals(static_cast<_Tuple*>(_RawVals));
_Tuple& _Tup = *_FnVals;
_STD invoke(_STD move(_STD get<_Indices>(_Tup))...);
_Cnd_do_broadcast_at_thread_exit(); // TRANSITION, ABI
return 0;
}

所以我们可以理解为调用_Get_invoke就是调用invoke(_STD move(_STD get<_Indices>(_Tup))...);
invoke(_STD move(_STD get<_Indices>(_Tup))...);就是将回调函数和参数传递给invoke

1
2
3
CONSTEXPR17 auto invoke(_Callable&& _Obj, _Ty1&& _Arg1, _Types2&&... _Args2) noexcept(
noexcept(_Invoker1<_Callable, _Ty1>::_Call(
static_cast<_Callable&&>(_Obj), static_cast<_Ty1&&>(_Arg1), static_cast<_Types2&&>(_Args2)...)))

invoke实际就是调用了_Call函数,_Call的作用就是调用回调函数,并传递给回调函数参数,可以理解为向change_param
传递int类型的右值数据

1
change_param(int&& _Arg1)

这与change_param的定义不符合,change_param参数为左值引用, 不能绑定右值,也就是编译错误的原因。

所以在这里大家就理解了传递给thread 的参数都是按照右值的方式构造为Tuple类型,传递给系统级别函数_beginthreadex调用的。
那为什么使用std::ref就可以实现引用效果呢?
这里我们看下std::ref的源码

1
2
3
4
template <class _Ty>
_NODISCARD _CONSTEXPR20 reference_wrapper<_Ty> ref(_Ty& _Val) noexcept {
return reference_wrapper<_Ty>(_Val);
}

reference_wrapper是一个类类型,说白了就是将参数的地址和类型保存起来。

1
2
3
4
_CONSTEXPR20 reference_wrapper(_Uty&& _Val) noexcept(noexcept(_Refwrap_ctor_fun<_Ty>(_STD declval<_Uty>()))) {
_Ty& _Ref = static_cast<_Uty&&>(_Val);
_Ptr = _STD addressof(_Ref);
}

当我们要使用这个类对象时,自动转化为取内部参数的地址里的数据即可,就达到了和实参关联的效果

1
2
3
4
5
6
7
 _CONSTEXPR20 operator _Ty&() const noexcept {
return *_Ptr;
}

_NODISCARD _CONSTEXPR20 _Ty& get() const noexcept {
return *_Ptr;
}

所以我们可以这么理解传递给thread对象构造函数的参数,仍然作为右值被保存,如ref(int)实际是作为reference_wrapper(int)对象保存在threa的类成员里。
而调用的时候触发了仿函数()进而获取到外部实参的地址内的数据。

绑定类成员函数

有时候我们需要绑定一个类的成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
class X
{
public:
void do_lengthy_work() {
std::cout << "do_lengthy_work " << std::endl;
}
};

void bind_class_oops() {
X my_x;
std::thread t(&X::do_lengthy_work, &my_x);
t.join();
}

这里大家注意一下,如果thread绑定的回调函数是普通函数,可以在函数前加&或者不加&,因为编译器默认将普通函数名作为函数地址,如下两种写法都正确

1
2
3
4
5
6
7
8
void thead_work1(std::string str) {
std::cout << "str is " << str << std::endl;
}

std::string hellostr = "hello world!";
//两种方式都正确
std::thread t1(thead_work1, hellostr);
std::thread t2(&thead_work1, hellostr);

但是如果是绑定类的成员函数,必须添加&

使用move操作

有时候传递给线程的参数是独占的,所谓独占就是不支持拷贝赋值和构造,但是我们可以通过std::move的方式将参数的所有权转移给线程,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void deal_unique(std::unique_ptr<int> p) {
std::cout << "unique ptr data is " << *p << std::endl;
(*p)++;

std::cout << "after unique ptr data is " << *p << std::endl;
}

void move_oops() {
auto p = std::make_unique<int>(100);
std::thread t(deal_unique, std::move(p));
t.join();
//不能再使用p了,p已经被move废弃
// std::cout << "after unique ptr data is " << *p << std::endl;
}

总结

本文介绍了std::thread的基本操作,具体视频可以去B站看看我的C++视频讲解

https://space.bilibili.com/271469206/channel/collectiondetail?sid=313101&ctype=0

代码链接

源码链接https://gitee.com/secondtonone1/boostasio-learn