thread,async源码解析

简介

本文件汇总粉丝提出的关于并发的几个问题,做个备份,方便大家理解和学习。

局部变量返回值

关于局部变量返回值的问题我曾在视频中说会通过构造函数返回一个局部变量给调用者,编译器会先执行拷贝构造,如果没有拷贝构造再寻找移动构造。这么说是有问题的。
有热心的粉丝查询了chatgpt,当函数返回一个类类型的局部变量时会先调用移动构造,如果没有移动构造再调用拷贝构造。
所以对于一些没有拷贝构造但是实现了移动构造的类类型也支持通过函数返回局部变量。
在 C++11 之后,编译器会默认使用移动语义(move semantics)来提高性能

看个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class TestCopy {
public:
TestCopy(){}
TestCopy(const TestCopy& tp) {
std::cout << "Test Copy Copy " << std::endl;
}
TestCopy(TestCopy&& cp) {
std::cout << "Test Copy Move " << std::endl;
}
};

TestCopy TestCp() {
TestCopy tp;
return tp;
}

main 函数中调用TestCp

1
2
3
4
int main(){
TestCp();
return 0;
}

发现打印的是”Test Copy Move” .这说明优先调用的是移动构造,这也提醒我们,如果我们自定义的类实现了拷贝构造和移动构造,而这个类的移动给构造和拷贝构造实现方式不同时,要注意通过函数内部局部变量返回该类时调用移动构造是否会存在一些逻辑或安全的问题。

优先按照移动构造的方式返回局部的类对象,有一个好处就是可以返回一些只支持移动构造的类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
std::unique_ptr<int> ReturnUniquePtr() {
std::unique_ptr<int> uq_ptr = std::make_unique<int>(100);
return uq_ptr;
}

std::thread ReturnThread() {
std::thread t([]() {
int i = 0;
while (true) {
std::cout << "i is " << i << std::endl;
i++;
if (i == 5) {
break;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});

return t;
}

main函数中调用后,可以看到线程和unique_ptr都可被函数作为局部变量返回,而且返回的线程可以继续运行。

1
2
3
4
5
6
7
int main(){
auto rt_ptr = ReturnUniquePtr();
std::cout << "rt_ptr value is " << *rt_ptr << std::endl;
std::thread rt_thread = ReturnThread();
rt_thread.join();
return 0;
}

线程归属权问题

有粉丝反馈在使用thread时遇到崩溃,主要原因在于线程归属权没有理解好,我们不能将一个线程的归属权转移给一个已经绑定线程的变量。

比如下面的调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void ThreadOp() {

std::thread t1([]() {
int i = 0;
while (i < 5) {
std::this_thread::sleep_for(std::chrono::seconds(1));
i++;
}
});

std::thread t2([]() {
int i = 0;
while (i < 10) {
std::this_thread::sleep_for(std::chrono::seconds(1));
i++;
}
});

//不能将一个线程归属权绑定给一个已经绑定线程的变量,否则会触发terminate导致崩溃
t1 = std::move(t2);
t1.join();
t2.join();
}

我们在主函数中执行上述函数,会触发崩溃如下图

https://cdn.llfc.club/1697103423170.jpg

t1已经绑定了一个线程执行循环操作直到i<5。如果在t1没运行完的情况下将t2的归属权给t1,则会引发terminate崩溃错误。

具体原因我们可以看看thread在做移动赋值时的源码

1
2
3
4
5
6
7
8
thread& operator=(thread&& _Other) noexcept {
if (joinable()) {
_STD terminate();
}

_Thr = _STD exchange(_Other._Thr, {});
return *this;
}

在线程joinable()返回true时,会触发terminate()操作,也就是被赋值的线程没有被join过,此时执行operator=操作会导致terminate()
至于terminate()实现比较简单

1
_ACRTIMP __declspec(noreturn) void __cdecl terminate() throw();

可以看到terminate()就是抛出异常。

所以我们在之前的课程封装了了自动join的线程类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class joining_thread {
std::thread _t;
public:
joining_thread() noexcept = default;
template<typename Callable, typename ... Args>
explicit joining_thread(Callable&& func, Args&& ...args):
t(std::forward<Callable>(func), std::forward<Args>(args)...){}
explicit joining_thread(std::thread t) noexcept: _t(std::move(t)){}
joining_thread(joining_thread&& other) noexcept: _t(std::move(other._t)){}
joining_thread& operator=(joining_thread&& other) noexcept
{
//如果当前线程可汇合,则汇合等待线程完成再赋值
if (joinable()) {
join();
}
_t = std::move(other._t);
return *this;
}

joining_thread& operator=(joining_thread other) noexcept
{
//如果当前线程可汇合,则汇合等待线程完成再赋值
if (joinable()) {
join();
}
_t = std::move(other._t);
return *this;
}


~joining_thread() noexcept {
if (joinable()) {
join();
}
}

void swap(joining_thread& other) noexcept {
_t.swap(other._t);
}

std::thread::id get_id() const noexcept {
return _t.get_id();
}

bool joinable() const noexcept {
return _t.joinable();
}

void join() {
_t.join();
}

void detach() {
_t.detach();
}

std::thread& as_thread() noexcept {
return _t;
}

const std::thread& as_thread() const noexcept {
return _t;
}
};

thread参数值拷贝

之前讲到构造std::thread对象传递回调函数和参数,回调函数的参数绑定都是值拷贝的方式,这里再梳理一次
下面是thread的构造函数

1
2
3
4
template <class _Fn, class... _Args, enable_if_t<!is_same_v<_Remove_cvref_t<_Fn>, thread>, int> = 0>
_NODISCARD_CTOR explicit thread(_Fn&& _Fx, _Args&&... _Ax) {
_Start(_STD forward<_Fn>(_Fx), _STD forward<_Args>(_Ax)...);
}

构造函数内调用了_Start函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    template <class _Fn, class... _Args>
void _Start(_Fn&& _Fx, _Args&&... _Ax) {
// 1 处
using _Tuple = tuple<decay_t<_Fn>, decay_t<_Args>...>;
// 2 处
auto _Decay_copied = _STD make_unique<_Tuple>(_STD forward<_Fn>(_Fx), _STD forward<_Args>(_Ax)...);
// 3 处
constexpr auto _Invoker_proc = _Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(_Args)>{});

#pragma warning(push)
#pragma warning(disable : 5039) // pointer or reference to potentially throwing function passed to
// extern C function under -EHc. Undefined behavior may occur
// if this function throws an exception. (/Wall)
// 4处
_Thr._Hnd =
reinterpret_cast<void*>(_CSTD _beginthreadex(nullptr, 0, _Invoker_proc, _Decay_copied.get(), 0, &_Thr._Id));
#pragma warning(pop)

if (_Thr._Hnd) { // ownership transferred to the thread
(void) _Decay_copied.release();
} else { // failed to start thread
_Thr._Id = 0;
_Throw_Cpp_error(_RESOURCE_UNAVAILABLE_TRY_AGAIN);
}
}

我们从上面的代码 1处 可以看到_Tuple是一个去引用的类型,因为其内部存储的都是decay_t过后的类型,所以无论左值引用还是右值引用到这里都变为去引用的类型。

所以2处就是将参数和函数按照值拷贝的方式存在tuple中。

3处定义了一个可调用对象_Invoker_proc

4处启动线程调用_Invoker_proc进而调用我们传递的回调函数和参数。

所以综上所述,std::thread向回调函数传递值是以副本的方式,回调函数参数是引用类型,可以将传递的实参用std::ref包装达到修改的效果。
因为std::ref其实是构造了reference_wrapper类对象,这个类实现了仿函数

1
2
3
_CONSTEXPR20 operator _Ty&() const noexcept {
return *_Ptr;
}

所以当线程接收std::ref包裹的参数时会调用仿函数通过指针解引用的方式获取外部实参,以_Ty&返回,从而达到修改的效果。

那么如下调用就会报错,提示“invoke”: 未找到匹配的重载函数。

1
2
3
4
5
6
7
8
void ChangeValue() {
int m = 100;
std::thread t1{ [](int& rm) {
rm++;
}, m };

t1.join();
}

因为 invoke函数调用时会将参数以右值的方式移动给回调函数,这会造成左值引用绑定右值的情况,所以编译报错。

改为下面这样写就没问题了

1
2
3
4
5
6
7
8
void ChangeValue() {
int m = 100;
std::thread t1{ [](int& rm) {
rm++;
}, std::ref(m) };

t1.join();
}

async注意事项

部分粉丝反馈async不能像js那样实现完全的纯异步,确实是存在这样的情况,因为于js不同,js是单线程的,而C++需要关注线程的生命周期。

我们使用async时,其实其内部调用了thread,pacakged_task,future等机制。async会返回一个future这个future如果会在被析构时等待其绑定的线程任务是否执行完成。

我们看一段cppreference.com中的描述

https://cdn.llfc.club/1697109536918.jpg

“The creator of the asynchronous operation can then use a variety of methods to query, wait for, or extract a value from the std::future. These methods may block if the asynchronous operation has not yet provided a value.”

异步操作async返回std::future, 调用者可以通过query,wait for等方式从std::future中查询状态。
但是如果async直接调用而不适用返回值则可能会阻塞。如下例子

1
2
3
4
5
6
7
8
9
10
void BlockAsync() {
std::cout << "begin block async" << std::endl;
{
std::async(std::launch::async, []() {
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout << "std::async called " << std::endl;
});
}
std::cout << "end block async" << std::endl;
}

我们在主函数调用BlockAsync(), 发现async并没有异步执行任务,而是按次序输出如下

1
2
3
begin block async
std::async called
end block async

因为async返回一个右值类型的future,无论左值还是右值,future都要被析构,因为其处于一个局部作用域{}中。
当编译器执行到}时会触发future析构。但是future析构要保证其关联的任务完成,所以需要等待任务完成future才被析构,
所以也就成了串行的效果了。

所以C++ 官方文档说 如果调用析构函数的那个future是某一shared state的最后持有者,而相关的task已启动但尚未结束,析构函数会造成阻塞,直到任务结束

至于为什么future析构要等到其关联的任务完成我们可以看一下async源码

1
2
3
4
5
6
7
8
9
10
11
12
13
template <class _Fty, class... _ArgTypes>
_NODISCARD future<_Invoke_result_t<decay_t<_Fty>, decay_t<_ArgTypes>...>> async(
launch _Policy, _Fty&& _Fnarg, _ArgTypes&&... _Args) {
// manages a callable object launched with supplied policy
using _Ret = _Invoke_result_t<decay_t<_Fty>, decay_t<_ArgTypes>...>;
using _Ptype = typename _P_arg_type<_Ret>::type;
//1 处
_Promise<_Ptype> _Pr(
_Get_associated_state<_Ret>(_Policy, _Fake_no_copy_callable_adapter<_Fty, _ArgTypes...>(
_STD forward<_Fty>(_Fnarg), _STD forward<_ArgTypes>(_Args)...)));
//2 处
return future<_Ret>(_Pr._Get_state_for_future(), _Nil());
}

我们先看看_Get_associated_state的源码

1
2
3
4
5
6
7
8
9
10
11
template <class _Ret, class _Fty>
_Associated_state<typename _P_arg_type<_Ret>::type>* _Get_associated_state(
launch _Psync, _Fty&& _Fnarg) { // construct associated asynchronous state object for the launch type
switch (_Psync) { // select launch type
case launch::deferred:
return new _Deferred_async_state<_Ret>(_STD forward<_Fty>(_Fnarg));
case launch::async: // TRANSITION, fixed in vMajorNext, should create a new thread here
default:
return new _Task_async_state<_Ret>(_STD forward<_Fty>(_Fnarg));
}
}

_Get_associated_state 做的事情很简单,根据我们不同的策略deferred还是async去构造不同的异步状态。如果是launch::async策略,我们创建一个
_Task_async_state类型的指针对象,我们将这个指针转化为_Associated_state指针返回,_Associated_state_Task_async_state的基类。

async内 1处用该返回值构造了一个_Promise<_Ptype>类型的对象_Pr

async内 2处 用_Pr._Get_state_for_future()返回值构造了future,该返回值是_State_manager<_Ty>类型对象。

因为future继承于_State_manager,所以_Pr._Get_state_for_future()返回的值主要用来构造future的基类。

析构future时要析构future子类再析构其基类,future本身的析构没有什么,而其基类_State_manager<_Ty>析构时调用如下

1
2
3
4
5
~_State_manager() noexcept {
if (_Assoc_state) {
_Assoc_state->_Release();
}
}

看源码我们知道_Assoc_state_Associated_state<_Ty> *类型

1
_Associated_state<_Ty>* _Assoc_state;

_Assoc_state * 就是我们之前在_Get_associated_state中开辟并返回的_Task_async_state*类型转化的。

我们沿着_Assoc_state->_Release一路追踪,会发现最终调用了下面的代码

1
2
3
4
5
6
7
void _Delete_this() { // delete this object
if (_Deleter) {
_Deleter->_Delete(this);
} else {
delete this;
}
}

如果没有删除器则会直接调用delete this, 会调用_Assoc_state的析构函数,因其析构函数为虚析构,进而调用_Task_async_state的析构函数

所以我们 ~_State_manager()调用的其实是_Task_async_state的析构函数, 我们看一下_Task_async_state的析构函数源码

1
2
3
4
5
6
7
virtual ~_Task_async_state() noexcept {
_Wait();
}

virtual void _Wait() override { // wait for completion
_Task.wait();
}

从源码中可以看到_Task_async_state 被析构时会等待任务完成,这也就是future需等待任务完成后才析构的原因。

仅仅介绍这个事情不是我得初衷,我们介绍一种更为隐晦的死锁情况, 看下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void DeadLock() {
std::mutex mtx;
std::cout << "DeadLock begin " << std::endl;
std::lock_guard<std::mutex> dklock(mtx);
{
std::future<void> futures = std::async(std::launch::async, [&mtx]() {
std::cout << "std::async called " << std::endl;
std::lock_guard<std::mutex> dklock(mtx);
std::cout << "async working...." << std::endl;
});
}

std::cout << "DeadLock end " << std::endl;
}

上面函数的作用意图在主线程中先执行加锁,再通过async启动一个线程异步执行任务,执行的任务与主线程互斥,所以在lambda表达式中加锁。
但是这么做会造成死锁,因为主线程输出”DeadLock begin “加锁,此时async启动一个线程,那么lambda表达式会先输出”std::async called “.
但是在子线程中无法加锁成功,因为主线程没有释放锁。而主线程无法释放锁,因为主线程要等待async执行完。
因为我们上面提到过,futures处于局部作用域,即将析构,而析构又要等待任务完成,任务需要加锁,所以永远完成不了,这样就死锁了。

所以使用async要注意其返回的future是不是shared state的最后持有者。

这里有个粉丝问道能不能用async实现这样的需求

  1. 你的需求是func1 中要异步执行asyncFunc函数。
  2. func2中先收集asyncFunc函数运行的结果,只有结果正确才执行
  3. func1启动异步任务后继续执行,执行完直接退出不用等到asyncFunc运行完

如果我们理解了async的原理后不难实现如下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int asyncFunc() {
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout << "this is asyncFunc" << std::endl;
return 0;
}

void func1(std::future<int>& future_ref) {
std::cout << "this is func1" << std::endl;
future_ref = std::async(std::launch::async, asyncFunc);
}

void func2(std::future<int>& future_ref) {
std::cout << "this is func2" << std::endl;
auto future_res = future_ref.get();
if (future_res == 0) {
std::cout << "get asyncFunc result success !" << std::endl;
}
else {
std::cout << "get asyncFunc result failed !" << std::endl;
return;
}
}

//提供多种思路,这是第一种
void first_method() {
std::future<int> future_tmp;
func1(future_tmp);
func2(future_tmp);
}

上面的例子我们保证在func1func2使用的是future的引用即可。这样func1内不会因为启动async而阻塞,因为future_ref不是shared state最后持有者。

如果真的想实现一个纯异步的操作倒也不难,可以这样实现

1
2
3
4
5
6
7
8
9
10
11
12
template<typename Func, typename... Args  >
auto ParallenExe(Func&& func, Args && ... args) -> std::future<decltype(func(args...))> {
typedef decltype(func(args...)) RetType;
std::function<RetType()> bind_func = std::bind(std::forward<Func>(func), std::forward<Args>(args)...);
std::packaged_task<RetType()> task(bind_func);
auto rt_future = task.get_future();
std::thread t(std::move(task));

t.detach();

return rt_future;
}

上面的函数ParallenExe内部我们通过bind操作将函数和参数绑定,生成一个返回值为RetType类型,参数为void的函数bind_func
接着我们用这个函数生成了一个packaged_task类型的对象task,这个task获取future留作以后函数结束时返回。
我们启动了一个线程处理这个task,并将这个线程detach,保证其分离独立运行。返回的rt_future并不是shared state最后持有者,因为task内部也会持有
shared_state,引用计数并不会变为0,所以并不会触发如下析构

1
2
3
4
5
void _Release() { // decrement reference count and destroy when zero
if (_MT_DECR(_Refs) == 0) {
_Delete_this();
}
}

那么我们写一个函数测试一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void TestParallen1() {
int i = 0;
std::cout << "Begin TestParallen1 ..." << std::endl;
{
ParallenExe([](int i) {
while (i < 3) {
i++;
std::cout << "ParllenExe thread func " << i << " times" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}, i);
}

std::cout << "End TestParallen1 ..." << std::endl;
}

在上面的函数中我们有意让ParallenExe放在一个局部的{}中执行,意在局部作用域结束后ParallenExe返回的future引用计数-1,以此证明其引用计数是否为0,

如果引用计数为0,则会执行future的析构进而等待任务执行完成,那么看到的输出将是

1
2
3
4
5
Begin TestParallen1 ...
ParllenExe thread func 1 times
ParllenExe thread func 2 times
ParllenExe thread func 3 times
End TestParallen1 ...

如果引用计数不会为0,则不会执行future的析构函数,那么看到的输出是这样的

1
2
3
4
5
Begin TestParallen1 ...
End TestParallen1 ...
ParllenExe thread func 1 times
ParllenExe thread func 2 times
ParllenExe thread func 3 times

我们在main函数中调用做测试, 因为要防止主线程过早退出,所以我们先让主线程睡眠4秒

1
2
3
4
5
6
int main()
{
TestParallen1();
std::this_thread::sleep_for(std::chrono::seconds(4));
std::cout << "Main Exited!\n";
}

而事实证明是第二种,输出如下

1
2
3
4
5
Begin TestParallen1 ...
End TestParallen1 ...
ParllenExe thread func 1 times
ParllenExe thread func 2 times
ParllenExe thread func 3 times

由此可见我们实现了不会阻塞的并发函数,但是也会存在一些顾虑,比如我们的主线程如果不睡眠4秒,那很可能主线程退出了子线程的任务没执行完而被强制回收。
所以归根结底,这种方式我们也需要在合适的时候等待汇合,比如调用future的get或wait操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void TestParallen2() {
int i = 0;
std::cout << "Begin TestParallen2 ..." << std::endl;

auto rt_future = ParallenExe([](int i) {
while (i < 3) {
i++;
std::cout << "ParllenExe thread func " << i << " times" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}, i);

std::cout << "End TestParallen2 ..." << std::endl;

rt_future.wait();
}

这就是我说的,归根结底C++和js的体系不一样,C++要管理开辟的线程生命周期,我们总归要在合适的时机汇合。
所以std::async会返回future, future会判断是不是最后持有的shared_state进而帮我们做汇合操作,这并不是缺陷而是安全性的保证。至于我们不想在该处汇合,只要保证该处future不会是最后持有shared_state的即可。

总结

本文介绍了threadasync的源码级分析,回答了大家常遇到的问题。

详细源码

https://gitee.com/secondtonone1/boostasio-learn/tree/master/concurrent/day09-QASummary

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290