恋恋风辰的个人博客


  • Home

  • Archives

  • Categories

  • Tags

  • Search

C++ 并发三剑客future, promise和async

Posted on 2023-09-17 | In C++

简介

本文介绍C++ 并发三剑客, future, promise以及async用法。这三个类是实现并发技术的关键,接下来详细介绍一下

async用法

std::async 是一个用于异步执行函数的模板函数,它返回一个 std::future 对象,该对象用于获取函数的返回值。

以下是一个使用 std::async 的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <iostream>
#include <future>
#include <chrono>

// 定义一个异步任务
std::string fetchDataFromDB(std::string query) {
// 模拟一个异步任务,比如从数据库中获取数据
std::this_thread::sleep_for(std::chrono::seconds(5));
return "Data: " + query;
}

int main() {
// 使用 std::async 异步调用 fetchDataFromDB
std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data");

// 在主线程中做其他事情
std::cout << "Doing something else..." << std::endl;

// 从 future 对象中获取数据
std::string dbData = resultFromDB.get();
std::cout << dbData << std::endl;

return 0;
}

在这个示例中,std::async 创建了一个新的线程(或从内部线程池中挑选一个线程)并自动与一个 std::promise 对象相关联。std::promise 对象被传递给 fetchDataFromDB 函数,函数的返回值被存储在 std::future 对象中。在主线程中,我们可以使用 std::future::get 方法从 std::future 对象中获取数据。注意,在使用 std::async 的情况下,我们必须使用 std::launch::async 标志来明确表明我们希望函数异步执行。

上面的例子输出

1
2
Doing something else...
Data: Data

async的启动策略

std::async函数可以接受几个不同的启动策略,这些策略在std::launch枚举中定义。除了std::launch::async之外,还有以下启动策略:

  1. std::launch::deferred:这种策略意味着任务将在调用std::future::get()或std::future::wait()函数时延迟执行。换句话说,任务将在需要结果时同步执行。
  2. std::launch::async | std::launch::deferred:这种策略是上面两个策略的组合。任务可以在一个单独的线程上异步执行,也可以延迟执行,具体取决于实现。

默认情况下,std::async使用std::launch::async | std::launch::deferred策略。这意味着任务可能异步执行,也可能延迟执行,具体取决于实现。需要注意的是,不同的编译器和操作系统可能会有不同的默认行为。

future的wait和get

std::future::get() 和 std::future::wait() 是 C++ 中用于处理异步任务的两个方法,它们的功能和用法有一些重要的区别。

  1. std::future::get():

std::future::get() 是一个阻塞调用,用于获取 std::future 对象表示的值或异常。如果异步任务还没有完成,get() 会阻塞当前线程,直到任务完成。如果任务已经完成,get() 会立即返回任务的结果。重要的是,get() 只能调用一次,因为它会移动或消耗掉 std::future 对象的状态。一旦 get() 被调用,std::future 对象就不能再被用来获取结果。
2. std::future::wait():

std::future::wait() 也是一个阻塞调用,但它与 get() 的主要区别在于 wait() 不会返回任务的结果。它只是等待异步任务完成。如果任务已经完成,wait() 会立即返回。如果任务还没有完成,wait() 会阻塞当前线程,直到任务完成。与 get() 不同,wait() 可以被多次调用,它不会消耗掉 std::future 对象的状态。

总结一下,这两个方法的主要区别在于:

  • std::future::get() 用于获取并返回任务的结果,而 std::future::wait() 只是等待任务完成。
  • get() 只能调用一次,而 wait() 可以被多次调用。
  • 如果任务还没有完成,get() 和 wait() 都会阻塞当前线程,但 get() 会一直阻塞直到任务完成并返回结果,而 wait() 只是在等待任务完成。

你可以使用std::future的wait_for()或wait_until()方法来检查异步操作是否已完成。这些方法返回一个表示操作状态的std::future_status值。

1
2
3
4
5
if(fut.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {  
// 操作已完成
} else {
// 操作尚未完成
}

将任务和future关联

std::packaged_task和std::future是C++11中引入的两个类,它们用于处理异步任务的结果。

std::packaged_task是一个可调用目标,它包装了一个任务,该任务可以在另一个线程上运行。它可以捕获任务的返回值或异常,并将其存储在std::future对象中,以便以后使用。

std::future代表一个异步操作的结果。它可以用于从异步任务中获取返回值或异常。

以下是使用std::packaged_task和std::future的基本步骤:

  1. 创建一个std::packaged_task对象,该对象包装了要执行的任务。
  2. 调用std::packaged_task对象的get_future()方法,该方法返回一个与任务关联的std::future对象。
  3. 在另一个线程上调用std::packaged_task对象的operator(),以执行任务。
  4. 在需要任务结果的地方,调用与任务关联的std::future对象的get()方法,以获取任务的返回值或异常。

以下是一个简单的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
int my_task() {
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << "my task run 5 s" << std::endl;
return 42;
}

void use_package() {
// 创建一个包装了任务的 std::packaged_task 对象
std::packaged_task<int()> task(my_task);

// 获取与任务关联的 std::future 对象
std::future<int> result = task.get_future();

// 在另一个线程上执行任务
std::thread t(std::move(task));
t.detach(); // 将线程与主线程分离,以便主线程可以等待任务完成

// 等待任务完成并获取结果
int value = result.get();
std::cout << "The result is: " << value << std::endl;

}

在上面的示例中,我们创建了一个包装了任务的std::packaged_task对象,并获取了与任务关联的std::future对象。然后,我们在另一个线程上执行任务,并等待任务完成并获取结果。最后,我们输出结果。

我们可以使用 std::function 和 std::package_task 来包装带参数的函数。std::package_task 是一个模板类,它包装了一个可调用对象,并允许我们将其作为异步任务传递。

promise 用法

C++11引入了std::promise和std::future两个类,用于实现异步编程。std::promise用于在某一线程中设置某个值或异常,而std::future则用于在另一线程中获取这个值或异常。

下面是std::promise的基本用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
#include <thread>
#include <future>

void set_value(std::promise<int> prom) {
// 设置 promise 的值
prom.set_value(10);
}

int main() {
// 创建一个 promise 对象
std::promise<int> prom;
// 获取与 promise 相关联的 future 对象
std::future<int> fut = prom.get_future();
// 在新线程中设置 promise 的值
std::thread t(set_value, std::move(prom));
// 在主线程中获取 future 的值
std::cout << "Waiting for the thread to set the value...\n";
std::cout << "Value set by the thread: " << fut.get() << '\n';
t.join();
return 0;
}

程序输出

1
2
3
Waiting for the thread to set the value...
promise set value successValue set by the thread:
10

在上面的代码中,我们首先创建了一个std::promise<int>对象,然后通过调用get_future()方法获取与之相关联的std::future<int>对象。然后,我们在新线程中通过调用set_value()方法设置promise的值,并在主线程中通过调用fut.get()方法获取这个值。注意,在调用fut.get()方法时,如果promise的值还没有被设置,则该方法会阻塞当前线程,直到值被设置为止。

除了set_value()方法外,std::promise还有一个set_exception()方法,用于设置异常。该方法接受一个std::exception_ptr参数,该参数可以通过调用std::current_exception()方法获取。下面是一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include <iostream>
#include <thread>
#include <future>

void set_exception(std::promise<void> prom) {
try {
// 抛出一个异常
throw std::runtime_error("An error occurred!");
} catch(...) {
// 设置 promise 的异常
prom.set_exception(std::current_exception());
}
}

int main() {
// 创建一个 promise 对象
std::promise<void> prom;
// 获取与 promise 相关联的 future 对象
std::future<void> fut = prom.get_future();
// 在新线程中设置 promise 的异常
std::thread t(set_exception, std::move(prom));
// 在主线程中获取 future 的异常
try {
std::cout << "Waiting for the thread to set the exception...\n";
fut.get();
} catch(const std::exception& e) {
std::cout << "Exception set by the thread: " << e.what() << '\n';
}
t.join();
return 0;
}

上述代码输出

1
2
Waiting for the thread to set the exception...
Exception set by the thread: An error occurred!

当然我们使用promise时要注意一点,如果promise被释放了,而其他的线程还未使用与promise关联的future,当其使用这个future时会报错。如下是一段错误展示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void use_promise_destruct() {
std::thread t;
std::future<int> fut;
{
// 创建一个 promise 对象
std::promise<int> prom;
// 获取与 promise 相关联的 future 对象
fut = prom.get_future();
// 在新线程中设置 promise 的值
t = std::thread(set_value, std::move(prom));
}
// 在主线程中获取 future 的值
std::cout << "Waiting for the thread to set the value...\n";
std::cout << "Value set by the thread: " << fut.get() << '\n';
t.join();
}

随着局部作用域}的结束,prom可能被释放也可能会被延迟释放,
如果立即释放则fut.get()获取的值会报error_value的错误。

共享类型的future

当我们需要多个线程等待同一个执行结果时,需要使用std::shared_future

以下是一个适合使用std::shared_future的场景,多个线程等待同一个异步操作的结果:

假设你有一个异步任务,需要多个线程等待其完成,然后这些线程需要访问任务的结果。在这种情况下,你可以使用std::shared_future来共享异步任务的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void myFunction(std::promise<int>&& promise) {
// 模拟一些工作
std::this_thread::sleep_for(std::chrono::seconds(1));
promise.set_value(42); // 设置 promise 的值
}

void threadFunction(std::shared_future<int> future) {
try {
int result = future.get();
std::cout << "Result: " << result << std::endl;
}
catch (const std::future_error& e) {
std::cout << "Future error: " << e.what() << std::endl;
}
}

void use_shared_future() {
std::promise<int> promise;
std::shared_future<int> future = promise.get_future();

std::thread myThread1(myFunction, std::move(promise)); // 将 promise 移动到线程中

// 使用 share() 方法获取新的 shared_future 对象

std::thread myThread2(threadFunction, future);

std::thread myThread3(threadFunction, future);

myThread1.join();
myThread2.join();
myThread3.join();
}

在这个示例中,我们创建了一个std::promise<int>对象promise和一个与之关联的std::shared_future<int>对象future。然后,我们将promise对象移动到另一个线程myThread1中,该线程将执行myFunction函数,并在完成后设置promise的值。我们还创建了两个线程myThread2和myThread3,它们将等待future对象的结果。如果myThread1成功地设置了promise的值,那么future.get()将返回该值。这些线程可以同时访问和等待future对象的结果,而不会相互干扰。

但是大家要注意,如果一个future被移动给两个shared_future是错误的。

1
2
3
4
5
6
7
8
9
10
11
12
13
void use_shared_future() {
std::promise<int> promise;
std::shared_future<int> future = promise.get_future();

std::thread myThread1(myFunction, std::move(promise)); // 将 promise 移动到线程中

std::thread myThread2(threadFunction, std::move(future));
std::thread myThread3(threadFunction, std::move(future));

myThread1.join();
myThread2.join();
myThread3.join();
}

这种用法是错误的,一个future通过隐式构造传递给shared_future之后,这个shared_future被移动传递给两个线程是不合理的,因为第一次移动后shared_future的生命周期被转移了,接下来myThread3构造时用的std::move(future)future已经失效了,会报错,一般都是no state 之类的错误。

异常处理

std::future 是C++的一个模板类,它用于表示一个可能还没有准备好的异步操作的结果。你可以通过调用 std::future::get 方法来获取这个结果。如果在获取结果时发生了异常,那么 std::future::get 会重新抛出这个异常。

以下是一个例子,演示了如何在 std::future 中获取异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <iostream>
#include <future>
#include <stdexcept>
#include <thread>

void may_throw()
{
// 这里我们抛出一个异常。在实际的程序中,这可能在任何地方发生。
throw std::runtime_error("Oops, something went wrong!");
}

int main()
{
// 创建一个异步任务
std::future<void> result(std::async(std::launch::async, may_throw));

try
{
// 获取结果(如果在获取结果时发生了异常,那么会重新抛出这个异常)
result.get();
}
catch (const std::exception &e)
{
// 捕获并打印异常
std::cerr << "Caught exception: " << e.what() << std::endl;
}

return 0;
}

在这个例子中,我们创建了一个异步任务 may_throw,这个任务会抛出一个异常。然后,我们创建一个 std::future 对象 result 来表示这个任务的结果。在 main 函数中,我们调用 result.get() 来获取任务的结果。如果在获取结果时发生了异常,那么 result.get() 会重新抛出这个异常,然后我们在 catch 块中捕获并打印这个异常。

上面的例子输出

1
Caught exception: Oops, something went wrong!

线程池

我们可以利用上面提到的std::packaged_task和std::promise构建线程池,提高程序的并发能力。
先了解什么是线程池:

线程池是一种多线程处理形式,它处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

线程池可以避免在处理短时间任务时创建与销毁线程的代价,它维护着多个线程,等待着监督管理者分配可并发执行的任务,从而提高了整体性能。

下面是我提供的一套线程池源码,目前用在公司的项目中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#ifndef __THREAD_POOL_H__
#define __THREAD_POOL_H__

#include <atomic>
#include <condition_variable>
#include <future>
#include <iostream>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

#include "Instrument/none_copyable.hpp"

class ThreadPool : private NoneCopyable {
public:
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;

static ThreadPool& instance() {
static ThreadPool ins;
return ins;
}

using Task = std::packaged_task<void()>;

ThreadPool(unsigned int num = 15)
: stop_(false) {
{
if (num < 1)
thread_num_ = 1;
else
thread_num_ = num;
}
start();
}
~ThreadPool() {
stop();
}

template <class F, class... Args>
auto commit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
using RetType = decltype(f(args...));
if (stop_.load())
return std::future<RetType>{};

auto task = std::make_shared<std::packaged_task<RetType()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));

std::future<RetType> ret = task->get_future();
{
std::lock_guard<std::mutex> cv_mt(cv_mt_);
tasks_.emplace([task] { (*task)(); });
}
cv_lock_.notify_one();
return ret;
}

int idleThreadCount() {
return thread_num_;
}

private:
void start() {
for (int i = 0; i < thread_num_; ++i) {
pool_.emplace_back([this]() {
while (!this->stop_.load()) {
Task task;
{
std::unique_lock<std::mutex> cv_mt(cv_mt_);
this->cv_lock_.wait(cv_mt, [this] {
return this->stop_.load() || !this->tasks_.empty();
});
if (this->tasks_.empty())
return;

task = std::move(this->tasks_.front());
this->tasks_.pop();
}
this->thread_num_--;
task();
this->thread_num_++;
}
});
}
}
void stop() {
stop_.store(true);
cv_lock_.notify_all();
for (auto& td : pool_) {
if (td.joinable()) {
std::cout << "join thread " << td.get_id() << std::endl;
td.join();
}
}
}

private:
std::mutex cv_mt_;
std::condition_variable cv_lock_;
std::atomic_bool stop_;
std::atomic_int thread_num_;
std::queue<Task> tasks_;
std::vector<std::thread> pool_;
};

#endif // !__THREAD_POOL_H__

总结

本文介绍了如何使用future, promise以及async用法

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

源码链接

https://gitee.com/secondtonone1/boostasio-learn

利用条件变量构造线程安全队列

Posted on 2023-09-09 | In C++

简介

本文介绍如何使用条件变量控制并发的同步操作,试想有一个线程A一直输出1,另一个线程B一直输出2。我想让两个线程交替输出1,2,1,2…之类的效果,该如何实现?有的同学可能会说不是有互斥量mutex吗?可以用一个全局变量num表示应该哪个线程输出,比如num为1则线程A输出1,num为2则线程B输出2,mutex控制两个线程访问num,如果num和线程不匹配,就让该线程睡一会,这不就实现了吗?比如线程A加锁后发现当前num为2则表示它不能输出1,就解锁,将锁的使用权交给线程A,线程B就sleep一会。

不良实现

上面说的方式可以实现我们需要的功能,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
void PoorImpleman() {
std::thread t1([]() {
for (;;) {

{
std::lock_guard<std::mutex> lock(mtx_num);
if (num == 1) {
std::cout << "thread A print 1....." << std::endl;
num++;
continue;
}
}

std::this_thread::sleep_for(std::chrono::milliseconds(500));

}

});

std::thread t2([]() {
for (;;) {

{
std::lock_guard<std::mutex> lock(mtx_num);
if (num == 2) {
std::cout << "thread B print 2....." << std::endl;
num--;
continue;
}
}

std::this_thread::sleep_for(std::chrono::milliseconds(500));

}

});

t1.join();
t2.join();
}

PoorImpleman虽然能实现我们交替打印的功能,会造成消息处理的不及时,因为线程A要循环检测num值,如果num不为1,则线程A就睡眠了,在线程A睡眠这段时间很可能B已经处理完打印了,此时A还在睡眠,是对资源的浪费,也错过了最佳的处理时机。所以我们提出了用条件变量来通知线程的机制,当线程A发现条件不满足时可以挂起,等待线程B通知,线程B通知A后,A被唤醒继续处理。

条件变量

我们这里用条件变量实现上面的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void ResonableImplemention() {
std::thread t1([]() {
for (;;) {

std::unique_lock<std::mutex> lock(mtx_num);
cvA.wait(lock, []() {
return num == 1;
});

num++;
std::cout << "thread A print 1....." << std::endl;
cvB.notify_one();
}

});

std::thread t2([]() {
for (;;) {

std::unique_lock<std::mutex> lock(mtx_num);
cvB.wait(lock, []() {
return num == 2;
});

num--;
std::cout << "thread B print 2....." << std::endl;
cvA.notify_one();
}

});

t1.join();
t2.join();
}

当条件不满足时(num 不等于1 时)cvA.wait就会挂起,等待线程B通知通知线程A唤醒,线程B采用的是cvA.notifyone。
这么做的好处就是线程交替处理非常及时。比起sleep的方式,我们可以从控制台上看出差异效果,sleep的方式看出日志基本是每隔1秒才打印一次,效率不高。

线程安全队列

之前我们实现过线程安全的栈,对于pop操作,我们如果在线程中调用empty判断是否为空,如果不为空,则pop,因为empty和pop内部分别加锁,是两个原子操作,导致pop时可能会因为其他线程提前pop导致队列为空,从而引发崩溃。我们当时的处理方式是实现了两个版本的pop,一种是返回智能指针类型,一种通过参数为引用的方式返回。对于智能指针版本我们发现队列为空则返回空指针,对于引用版本,
发现队列为空则抛出异常,这么做并不是很友好,所以我们可以通过条件变量完善之前的程序,不过这次我们重新实现一个线程安全队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <queue>
#include <mutex>
#include <condition_variable>
template<typename T>
class threadsafe_queue
{
private:
std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}
};

threadsafe_queue<data_chunk> data_queue;
void data_preparation_thread()
{
while(more_data_to_prepare())
{
data_chunk const data=prepare_data();
data_queue.push(data); ⇽--- ②
}
}
void data_processing_thread()
{
while(true)
{
data_chunk data;
data_queue.wait_and_pop(data);
process(data);
if(is_last_chunk(data))
break;
}
}

我们可以启动三个线程,一个producer线程用来向队列中放入数据。一个consumer1线程用来阻塞等待pop队列中的元素。

另一个consumer2尝试从队列中pop元素,如果队列为空则直接返回,如果非空则pop元素。

打印时为了保证线程输出在屏幕上不会乱掉,所以加了锁保证互斥输出

测试代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
void test_safe_que() {
threadsafe_queue<int> safe_que;
std::mutex mtx_print;
std::thread producer(
[&]() {
for (int i = 0; ;i++) {
safe_que.push(i);
{
std::lock_guard<std::mutex> printlk(mtx_print);
std::cout << "producer push data is " << i << std::endl;
}

std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
);

std::thread consumer1(
[&]() {
for (;;) {
auto data = safe_que.wait_and_pop();
{
std::lock_guard<std::mutex> printlk(mtx_print);
std::cout << "consumer1 wait and pop data is " << *data << std::endl;
}

std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
);

std::thread consumer2(
[&]() {
for (;;) {
auto data = safe_que.try_pop();
if (data != nullptr) {
{
std::lock_guard<std::mutex> printlk(mtx_print);
std::cout << "consumer2 try_pop data is " << *data << std::endl;
}

}

std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
);

producer.join();
consumer1.join();
consumer2.join();
}

测试效果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
producer push data is 0
consumer1 wait and pop data is 0
producer push data is 1
producer push data is 2
consumer2 try_pop data is 1
consumer1 wait and pop data is 2
producer push data is 3
producer push data is 4
consumer2 try_pop data is 3
consumer1 wait and pop data is 4
producer push data is 5
producer push data is 6
producer push data is 7
consumer2 try_pop data is 5
consumer1 wait and pop data is 6

我们能看到consumer1和consumer2是并发消费的

总结

本文介绍了如何通过条件变量实现并发线程的同步处理。

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

源码链接

https://gitee.com/secondtonone1/boostasio-learn

线程安全的单例模式

Posted on 2023-09-03 | In C++

简介

本文介绍C++ 线程安全的单例模式如何实现,通过介绍单例模式的演变历程,给读者更完备的实现单例模式的方案。

局部静态变量

我们知道当一个函数中定义一个局部静态变量,那么这个局部静态变量只会初始化一次,就是在这个函数第一次调用的时候,以后无论调用几次这个函数,函数内的局部静态变量都不再初始化。
那我们可以利用局部静态变量这一特点实现单例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Single2 {
private:
Single2()
{
}
Single2(const Single2&) = delete;
Single2& operator=(const Single2&) = delete;
public:
static Single2& GetInst()
{
static Single2 single;
return single;
}
};

上述版本的单例模式在C++11 以前存在多线程不安全的情况,编译器可能会初始化多个静态变量。
但是C++11推出以后,各厂商优化编译器,能保证线程安全。所以为了保证运行安全请确保使用C++11以上的标准。

饿汉式初始化

在C++11 推出以前,局部静态变量的方式实现单例存在线程安全问题,所以部分人推出了一种方案,就是在主线程启动后,其他线程没有启动前,由主线程先初始化单例资源,这样其他线程获取的资源就不涉及重复初始化的情况了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//饿汉式
class Single2Hungry
{
private:
Single2Hungry()
{
}
Single2Hungry(const Single2Hungry&) = delete;
Single2Hungry& operator=(const Single2Hungry&) = delete;
public:
static Single2Hungry* GetInst()
{
if (single == nullptr)
{
single = new Single2Hungry();
}
return single;
}
private:
static Single2Hungry* single;
};

调用如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//饿汉式初始化
Single2Hungry* Single2Hungry::single = Single2Hungry::GetInst();
void thread_func_s2(int i)
{
std::cout << "this is thread " << i << std::endl;
std::cout << "inst is " << Single2Hungry::GetInst() << std::endl;
}
void test_single2hungry()
{
std::cout << "s1 addr is " << Single2Hungry::GetInst() << std::endl;
std::cout << "s2 addr is " << Single2Hungry::GetInst() << std::endl;
for (int i = 0; i < 3; i++)
{
std::thread tid(thread_func_s2, i);
tid.join();
}
}

饿汉式是从使用角度规避多线程的安全问题,很多情况下我们很难从规则角度限制开发人员,所以这种方式不是很推荐。

懒汉式初始化

很多人觉得什么时候调用初始化是用户的权利,不应该加以限制,所以就有了懒汉式方式初始化资源,在用到时如果没有初始化单例则初始化,如果初始化了则直接使用.
所以这种方式我们要加锁,防止资源被重复初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class SinglePointer
{
private:
SinglePointer()
{
}
SinglePointer(const SinglePointer&) = delete;
SinglePointer& operator=(const SinglePointer&) = delete;
public:
static SinglePointer* GetInst()
{
if (single != nullptr)
{
return single;
}
s_mutex.lock();
if (single != nullptr)
{
s_mutex.unlock();
return single;
}
single = new SinglePointer();
s_mutex.unlock();
return single;
}
private:
static SinglePointer* single;
static std::mutex s_mutex;
};

调用如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SinglePointer* SinglePointer::single = nullptr;
std::mutex SinglePointer::s_mutex;
void thread_func_lazy(int i)
{
std::cout << "this is lazy thread " << i << std::endl;
std::cout << "inst is " << SinglePointer::GetInst() << std::endl;
}
void test_singlelazy()
{
for (int i = 0; i < 3; i++)
{
std::thread tid(thread_func_lazy, i);
tid.join();
}
//何时释放new的对象?造成内存泄漏
}

这种方式存在一个很严重的问题,就是当多个线程都调用单例函数时,我们不确定资源是被哪个线程初始化的。
回收指针存在问题,存在多重释放或者不知道哪个指针释放的问题。

智能指针

我们能想到一个自动初始化资源并且自动释放的方式就是智能指针。利用智能指针自动回收资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//可以利用智能指针完成自动回收

class SingleAuto
{
private:
SingleAuto()
{
}
SingleAuto(const SingleAuto&) = delete;
SingleAuto& operator=(const SingleAuto&) = delete;
public:
~SingleAuto()
{
std::cout << "single auto delete success " << std::endl;
}
static std::shared_ptr<SingleAuto> GetInst()
{
if (single != nullptr)
{
return single;
}
s_mutex.lock();
if (single != nullptr)
{
s_mutex.unlock();
return single;
}
single = std::shared_ptr<SingleAuto>(new SingleAuto);
s_mutex.unlock();
return single;
}
private:
static std::shared_ptr<SingleAuto> single;
static std::mutex s_mutex;
};

调用方式如下

1
2
3
4
5
6
7
8
9
10
11
std::shared_ptr<SingleAuto> SingleAuto::single = nullptr;
std::mutex SingleAuto::s_mutex;
void test_singleauto()
{
auto sp1 = SingleAuto::GetInst();
auto sp2 = SingleAuto::GetInst();
std::cout << "sp1 is " << sp1 << std::endl;
std::cout << "sp2 is " << sp2 << std::endl;
//此时存在隐患,可以手动删除裸指针,造成崩溃
// delete sp1.get();
}

这样开辟的资源交给智能指针管理免去了回收资源的麻烦。
但是有些人觉得虽然智能指针能自动回收内存,如果有开发人员手动delete指针怎么办?
所以有人提出了利用辅助类帮助智能指针释放资源,将智能指针的析构设置为私有。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//为了规避用户手动释放内存,可以提供一个辅助类帮忙回收内存
//并将单例类的析构函数写为私有

class SingleAutoSafe;
class SafeDeletor
{
public:
void operator()(SingleAutoSafe* sf)
{
std::cout << "this is safe deleter operator()" << std::endl;
delete sf;
}
};
class SingleAutoSafe
{
private:
SingleAutoSafe() {}
~SingleAutoSafe()
{
std::cout << "this is single auto safe deletor" << std::endl;
}
SingleAutoSafe(const SingleAutoSafe&) = delete;
SingleAutoSafe& operator=(const SingleAutoSafe&) = delete;
//定义友元类,通过友元类调用该类析构函数
friend class SafeDeletor;
public:
static std::shared_ptr<SingleAutoSafe> GetInst()
{
//1处
if (single != nullptr)
{
return single;
}
s_mutex.lock();
//2处
if (single != nullptr)
{
s_mutex.unlock();
return single;
}
//额外指定删除器
//3 处
single = std::shared_ptr<SingleAutoSafe>(new SingleAutoSafe, SafeDeletor());
//也可以指定删除函数
// single = std::shared_ptr<SingleAutoSafe>(new SingleAutoSafe, SafeDelFunc);
s_mutex.unlock();
return single;
}
private:
static std::shared_ptr<SingleAutoSafe> single;
static std::mutex s_mutex;
};

SafeDeletor就是删除的辅助类,实现了仿函数。构造智能指针时指定了SafeDeletor对象,这样就能帮助智能指针释放了。

但是上面的代码存在危险,比如懒汉式的使用方式,当多个线程调用单例时,有一个线程加锁进入3处的逻辑。
其他的线程有的在1处,判断指针非空则跳过初始化直接使用单例的内存会存在问题。
主要原因在于SingleAutoSafe * temp = new SingleAutoSafe() 这个操作是由三部分组成的
1 调用allocate开辟内存
2 调用construct执行SingleAutoSafe的构造函数
3 调用赋值操作将地址赋值给temp

而现实中2和3的步骤可能颠倒,所以有可能在一些编译器中通过优化是1,3,2的调用顺序,
其他线程取到的指针就是非空,还没来的及调用构造函数就交给外部使用造成不可预知错误。
为解决这个问题,C++11 推出了std::call_once函数保证多个线程只执行一次

call_once

C++11 提出了call_once函数,我们可以配合一个局部的静态变量once_flag实现线程安全的初始化。
多线程调用call_once函数时,会判断once_flag是否被初始化,如没被初始化则进入初始化流程,调用我们提供的初始化函数。
但是同一时刻只有一个线程能进入这个初始化函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class SingletonOnce {
private:
SingletonOnce() = default;
SingletonOnce(const SingletonOnce&) = delete;
SingletonOnce& operator = (const SingletonOnce& st) = delete;
static std::shared_ptr<SingletonOnce> _instance;

public :
static std::shared_ptr<SingletonOnce> GetInstance() {
static std::once_flag s_flag;
std::call_once(s_flag, [&]() {
_instance = std::shared_ptr<SingletonOnce>(new SingletonOnce);
});

return _instance;
}

void PrintAddress() {
std::cout << _instance.get() << std::endl;
}

~SingletonOnce() {
std::cout << "this is singleton destruct" << std::endl;
}
};

std::shared_ptr<SingletonOnce> SingletonOnce::_instance = nullptr;

调用方式如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void TestSingle() {

std::thread t1([]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
SingletonOnce::GetInstance()->PrintAddress();
});

std::thread t2([]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
SingletonOnce::GetInstance()->PrintAddress();
});

t1.join();
t2.join();
}

为了使用单例类更通用,比如项目中使用多个单例类,可以通过继承实现多个单例类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//为了让单例更加通用,可以做成模板类
template <typename T>
class Singleton {
protected:
Singleton() = default;
Singleton(const Singleton<T>&) = delete;
Singleton& operator=(const Singleton<T>& st) = delete;
static std::shared_ptr<T> _instance;
public:
static std::shared_ptr<T> GetInstance() {
static std::once_flag s_flag;
std::call_once(s_flag, [&]() {
_instance = std::shared_ptr<T>(new T);
});
return _instance;
}
void PrintAddress() {
std::cout << _instance.get() << std::endl;
}
~Singleton() {
std::cout << "this is singleton destruct" << std::endl;
}
};
template <typename T>
std::shared_ptr<T> Singleton<T>::_instance = nullptr;

比如我们想实现单例类,就像我们之前在网络编程中介绍的那样,可以通过继承实现单例模式

1
2
3
4
5
6
7
8
9
//想使用单例类,可以继承上面的模板,我们在网络编程中逻辑单例类用的就是这种方式
class LogicSystem :public Singleton<LogicSystem>
{
friend class Singleton<LogicSystem>;
public:
~LogicSystem(){}
private:
LogicSystem(){}
};

总结

如果你只是实现一个简单的单例类推荐使用返回局部静态变量的方式
如果想大规模实现多个单例类可以用call_once实现的模板类。

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

源码链接

https://gitee.com/secondtonone1/boostasio-learn

C++ 并发(4) unique_lock,读写锁以及递归锁

Posted on 2023-08-31 | In C++

简介

本文介绍C++ 并发中使用的其他类型的锁,包括unique_lock,shared_lock, 以及recursive_lock等。shared_lock和unique_lock比较常用,而recursive_lock用的不多,或尽可能规避用这种锁。

unique_lock

unique_lock和lock_guard基本用法相同,构造时默认加锁,析构时默认解锁,但unique_lock有个好处就是可以手动解锁。这一点尤为重要,方便我们控制锁住区域的粒度(加锁的范围大小),也能支持和条件变量配套使用,至于条件变量我们之后再介绍,本文主要介绍锁的相关操作。

1
2
3
4
5
6
7
8
9
10
//unique_lock 基本用法
std::mutex mtx;
int shared_data = 0;
void use_unique() {
//lock可自动解锁,也可手动解锁
std::unique_lock<std::mutex> lock(mtx);
std::cout << "lock success" << std::endl;
shared_data++;
lock.unlock();
}

我们可以通过unique_lock的owns_lock判断是否持有锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//可判断是否占有锁
void owns_lock() {
//lock可自动解锁,也可手动解锁
std::unique_lock<std::mutex> lock(mtx);
shared_data++;
if (lock.owns_lock()) {
std::cout << "owns lock" << std::endl;
}
else {
std::cout << "doesn't own lock" << std::endl;
}

lock.unlock();
if (lock.owns_lock()) {
std::cout << "owns lock" << std::endl;
}
else {
std::cout << "doesn't own lock" << std::endl;
}
}

上述代码输出

1
2
owns lock
doesn't own lock

unique_lock可以延迟加锁

1
2
3
4
5
6
7
8
9
 //可以延迟加锁
void defer_lock() {
//延迟加锁
std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
//可以加锁
lock.lock();
//可以自动析构解锁,也可以手动解锁
lock.unlock();
}

那我们写一段代码综合运用owns_lock和defer_lock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//同时使用owns和defer
void use_own_defer() {
std::unique_lock<std::mutex> lock(mtx);
// 判断是否拥有锁
if (lock.owns_lock())
{
std::cout << "Main thread has the lock." << std::endl;
}
else
{
std::cout << "Main thread does not have the lock." << std::endl;
}

std::thread t([]() {
std::unique_lock<std::mutex> lock(mtx, std::defer_lock);

// 判断是否拥有锁
if (lock.owns_lock())
{
std::cout << "Thread has the lock." << std::endl;
}
else
{
std::cout << "Thread does not have the lock." << std::endl;
}

// 加锁
lock.lock();

// 判断是否拥有锁
if (lock.owns_lock())
{
std::cout << "Thread has the lock." << std::endl;
}
else
{
std::cout << "Thread does not have the lock." << std::endl;
}

// 解锁
lock.unlock();
});
t.join();
}

上述代码回依次输出, 但是程序会阻塞,因为子线程会卡在加锁的逻辑上,因为主线程未释放锁,而主线程又等待子线程退出,导致整个程序卡住。

1
2
Main thread has the lock.
Thread does not have the lock.

和lock_guard一样,unique_lock也支持领养锁

1
2
3
4
5
6
7
8
9
10
11
12
//同样支持领养操作
void use_own_adopt() {
mtx.lock();
std::unique_lock<std::mutex> lock(mtx, std::adopt_lock);
if (lock.owns_lock()) {
std::cout << "owns lock" << std::endl;
}
else {
std::cout << "does not have the lock" << std::endl;
}
lock.unlock();
}

尽管是领养的,但是打印还是会出现owns lock,因为不管如何锁被加上,就会输出owns lock。

既然unique_lock支持领养操作也支持延迟加锁,那么可以用两种方式实现前文lock_guard实现的swap操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//之前的交换代码可以可以用如下方式等价实现
int a = 10;
int b = 99;
std::mutex mtx1;
std::mutex mtx2;

void safe_swap() {
std::lock(mtx1, mtx2);
std::unique_lock<std::mutex> lock1(mtx1, std::adopt_lock);
std::unique_lock<std::mutex> lock2(mtx2, std::adopt_lock);
std::swap(a, b);
//错误用法
//mtx1.unlock();
//mtx2.unlock();
}

void safe_swap2() {

std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock);
std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock);
//需用lock1,lock2加锁
std::lock(lock1, lock2);
//错误用法
//std::lock(mtx1, mtx2);
std::swap(a, b);
}

大家注意一旦mutex被unique_lock管理,加锁和释放的操作就交给unique_lock,不能调用mutex加锁和解锁,因为锁的使用权已经交给unique_lock了。

我们知道mutex是不支持移动和拷贝的,但是unique_lock支持移动,当一个mutex被转移给unique_lock后,可以通过unique_ptr转移其归属权.

1
2
3
4
5
6
7
8
9
10
11
12
//转移互斥量所有权
//互斥量本身不支持move操作,但是unique_lock支持
std::unique_lock <std::mutex> get_lock() {
std::unique_lock<std::mutex> lock(mtx);
shared_data++;
return lock;
}

void use_return() {
std::unique_lock<std::mutex> lock(get_lock());
shared_data++;
}

锁的粒度表示加锁的精细程度,一个锁的粒度要足够大,保证可以锁住要访问的共享数据。

同时一个锁的粒度要足够小,保证非共享数据不被锁住影响性能。

而unique_ptr则很好的支持手动解锁。

1
2
3
4
5
6
7
8
9
void precision_lock() {
std::unique_lock<std::mutex> lock(mtx);
shared_data++;
lock.unlock();
//不设计共享数据的耗时操作不要放在锁内执行
std::this_thread::sleep_for(std::chrono::seconds(1));
lock.lock();
shared_data++;
}

共享锁

试想这样一个场景,对于一个DNS服务,我们可以根据域名查询服务对应的ip地址,它很久才更新一次,比如新增记录,删除记录或者更新记录等。平时大部分时间都是提供给外部查询,对于查询操作,即使多个线程并发查询不加锁也不会有问题,但是当有线程修改DNS服务的ip记录或者增减记录时,其他线程不能查询,需等待修改完再查询。或者等待查询完,线程才能修改。也就是说读操作并不是互斥的,同一时间可以有多个线程同时读,但是写和读是互斥的,写与写是互斥的,简而言之,写操作需要独占锁。而读操作需要共享锁。

要想使用共享锁,需使用共享互斥量std::shared_mutex,std::shared_mutex是C++17标准提出的。
C++14标准可以使用std::shared_time_mutex,

std::shared_mutex 和 std::shared_timed_mutex 都是用于实现多线程并发访问共享数据的互斥锁,但它们之间存在一些区别:

  1. std::shared_mutex:
* 提供了 `lock()`, `try_lock()`, 和 `try_lock_for()` 以及 `try_lock_until()` 函数,这些函数都可以用于获取互斥锁。
* 提供了 `try_lock_shared()` 和 `lock_shared()` 函数,这些函数可以用于获取共享锁。
* 当 `std::shared_mutex` 被锁定后,其他尝试获取该锁的线程将会被阻塞,直到该锁被解锁。
  1. std::shared_timed_mutex:
* 与 `std::shared_mutex` 类似,也提供了 `lock()`, `try_lock()`, 和 `try_lock_for()` 以及 `try_lock_until()` 函数用于获取互斥锁。
* 与 `std::shared_mutex` 不同的是,它还提供了 `try_lock_shared()` 和 `lock_shared()` 函数用于获取共享锁,这些函数在尝试获取共享锁时具有超时机制。
* 当 `std::shared_timed_mutex` 被锁定后,其他尝试获取该锁的线程将会被阻塞,直到该锁被解锁,这与 `std::shared_mutex` 相同。然而,当尝试获取共享锁时,如果不能立即获得锁,`std::shared_timed_mutex` 会设置一个超时,超时过后如果仍然没有获取到锁,则操作将返回失败。

因此,std::shared_timed_mutex 提供了额外的超时机制,这使得它在某些情况下更适合于需要处理超时的并发控制。然而,如果不需要超时机制,可以使用更简单的 std::shared_mutex。

C++11标准没有共享互斥量,可以使用boost提供的boost::shared_mutex。

如果我们想构造共享锁,可以使用std::shared_lock,如果我们想构造独占锁, 可以使用std::lock_gurad.

我们用一个类DNService代表DNS服务,查询操作使用共享锁,而写操作使用独占锁,可以是如下方式的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class DNService {
public:
DNService() {}
//读操作采用共享锁
std::string QueryDNS(std::string dnsname) {
std::shared_lock<std::shared_mutex> shared_locks(_shared_mtx);
auto iter = _dns_info.find(dnsname);
if (iter != _dns_info.end()) {
return iter->second;
}

return "";
}

//写操作采用独占锁
void AddDNSInfo(std::string dnsname, std::string dnsentry) {
std::lock_guard<std::shared_mutex> guard_locks(_shared_mtx);
_dns_info.insert(std::make_pair(dnsname, dnsentry));
}
private:
std::map<std::string, std::string> _dns_info;
mutable std::shared_mutex _shared_mtx;
};

QueryDNS 用来查询dns信息,多个线程可同时访问。
AddDNSInfo 用来添加dns信息,属独占锁,同一时刻只有一个线程在修改。

递归锁

有时候我们在实现接口的时候内部加锁,接口内部调用完结束自动解锁。会出现一个接口调用另一个接口的情况,如果用普通的std::mutex就会出现卡死,因为嵌套加锁导致卡死。但是我们可以使用递归锁。

但我个人并不推荐递归锁,可以从设计源头规避嵌套加锁的情况,我们可以将接口相同的功能抽象出来,统一加锁。下面的设计演示了如何使用递归锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class RecursiveDemo {
public:
RecursiveDemo() {}
bool QueryStudent(std::string name) {
std::lock_guard<std::recursive_mutex> recursive_lock(_recursive_mtx);
auto iter_find = _students_info.find(name);
if (iter_find == _students_info.end()) {
return false;
}

return true;
}

void AddScore(std::string name, int score) {
std::lock_guard<std::recursive_mutex> recursive_lock(_recursive_mtx);
if (!QueryStudent(name)) {
_students_info.insert(std::make_pair(name, score));
return;
}

_students_info[name] = _students_info[name] + score;
}

//不推荐采用递归锁,使用递归锁说明设计思路并不理想,需优化设计
//推荐拆分逻辑,将共有逻辑拆分为统一接口
void AddScoreAtomic(std::string name, int score) {
std::lock_guard<std::recursive_mutex> recursive_lock(_recursive_mtx);
auto iter_find = _students_info.find(name);
if (iter_find == _students_info.end()) {
_students_info.insert(std::make_pair(name, score));
return;
}

_students_info[name] = _students_info[name] + score;
return;
}
private:

std::map<std::string, int> _students_info;
std::recursive_mutex _recursive_mtx;
};

我们可以看到AddScore函数内部调用了QueryStudent, 所以采用了递归锁。

但是我们同样可以改变设计,将两者公有的部分抽离出来生成一个新的接口AddScoreAtomic.

AddScoreAtomic可以不适用递归锁,照样能完成线程安全操作的目的。

总结

本文介绍了unique_lock,共享锁,递归锁等的使用,较为全面的介绍了这几种锁的使用场景和潜在风险。

视频链接

https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

源码链接

https://gitee.com/secondtonone1/boostasio-learn

C++ 并发(3) 互斥与死锁

Posted on 2023-08-20 | In C++

简介

本文介绍如何使用互斥量保证共享数据的安全,并讲述死锁的相关处理方案。

锁的使用

我们可以通过mutex对共享数据进行夹所,防止多线程访问共享区造成数据不一致问题。如下,我们初始化一个共享变量shared_data,然后定义了一个互斥量std::mutex,接下来启动了两个线程,分别执行use_lock增加数据,和一个lambda表达式减少数据。
结果可以看到两个线程对于共享数据的访问是独占的,单位时间片只有一个线程访问并输出日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
std::mutex  mtx1;
int shared_data = 100;

void use_lock() {
while (true) {
mtx1.lock();
shared_data++;
std::cout << "current thread is " << std::this_thread::get_id() << std::endl;
std::cout << "sharad data is " << shared_data << std::endl;
mtx1.unlock();
std::this_thread::sleep_for(std::chrono::microseconds(10));
}

}

void test_lock() {
std::thread t1(use_lock);

std::thread t2([]() {
while (true) {
mtx1.lock();
shared_data--;
std::cout << "current thread is " << std::this_thread::get_id() << std::endl;
std::cout << "sharad data is " << shared_data << std::endl;
mtx1.unlock();
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
});

t1.join();
t2.join();
}

lock_guard的使用

当然我们可以用lock_guard自动加锁和解锁,比如上面的函数可以等价简化为

1
2
3
4
5
6
7
8
9
10
void use_lock() {
while (true) {
std::lock_guard<std::mutex> lock(mtx1);
shared_data++;
std::cout << "current thread is " << std::this_thread::get_id() << std::endl;
std::cout << "sharad data is " << shared_data << std::endl;
std::this_thread::sleep_for(std::chrono::microseconds(10));
}

}

lock_guard在作用域结束时自动调用其析构函数解锁,这么做的一个好处是简化了一些特殊情况从函数中返回的写法,比如异常或者条件不满足时,函数内部直接return,锁也会自动解开。

如何保证数据安全

有时候我们可以将对共享数据的访问和修改聚合到一个函数,在函数内加锁保证数据的安全性。但是对于读取类型的操作,即使读取函数是线程安全的,但是返回值抛给外边使用,存在不安全性。比如一个栈对象,我们要保证其在多线程访问的时候是安全的,可以在判断栈是否为空,判断操作内部我们可以加锁,但是判断结束后返回值就不在加锁了,就会存在线程安全问题。

比如我定义了如下栈, 对于多线程访问时判断栈是否为空,此后两个线程同时出栈,可能会造成崩溃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
template<typename T>
class threadsafe_stack1
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack1() {}
threadsafe_stack1(const threadsafe_stack1& other)
{
std::lock_guard<std::mutex> lock(other.m);
//①在构造函数的函数体(constructor body)内进行复制操作
data = other.data;
}
threadsafe_stack1& operator=(const threadsafe_stack1&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}

//问题代码
T pop()
{
std::lock_guard<std::mutex> lock(m);
auto element = data.top();
data.pop();
return element;
}

bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};

如下, 线程1和线程2先后判断栈都不为空,之后执行出战操作,会造成崩溃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void test_threadsafe_stack1() {
threadsafe_stack1<int> safe_stack;
safe_stack.push(1);

std::thread t1([&safe_stack]() {
if (!safe_stack.empty()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
safe_stack.pop();
}
});

std::thread t2([&safe_stack]() {
if (!safe_stack.empty()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
safe_stack.pop();
}
});

t1.join();
t2.join();
}

解决这个问题我们可以用抛出异常的方式,比如定义一个空栈的异常

1
2
3
4
struct empty_stack : std::exception
{
const char* what() const throw();
};

然后实现我们的出栈函数

1
2
3
4
5
6
7
8
T pop()
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
auto element = data.top();
data.pop();
return element;
}

这么做就需要在外层使用的时候捕获异常。这是C++ 并发编程中提及的建议。但是我觉得可以在函数pop内部再次判断栈是否为空,若为空则返回一个非法数据,这样比抛出异常好一些。但是如果T是一个复杂类型,我们很难定义一个非法值给外界知晓,这一点可以通过智能指针进行优化。之后我们再介绍更优化的方案,因为现在这个pop函数仍存在问题,比如T是一个vector<int>类型,那么在pop函数内部element就是vector<int>类型,开始element存储了一些int值,程序没问题,函数执行pop操作, 假设此时程序内存暴增,导致当程序使用的内存足够大时,可用的有效空间不够, 函数返回element时,就会就会存在vector做拷贝赋值时造成失败。即使我们捕获异常,释放部分空间但也会导致栈元素已经出栈,数据丢失了。这其实是内存管理不当造成的,但是C++ 并发编程一书中给出了优化方案。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
struct empty_stack : std::exception
{
const char* what() const throw();
};

template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack() {}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
//①在构造函数的函数体(constructor body)内进行复制操作
data = other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
//②试图弹出前检查是否为空栈
if (data.empty()) throw empty_stack();
//③改动栈容器前设置返回值
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
value = data.top();
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};

我们提供了两个版本的pop操作,一个是带引用类型的参数的,一个是直接pop出智能指针类型,这样在pop函数内部减少了数据的拷贝,防止内存溢出,其实这两种做法确实是相比之前直接pop固定类型的值更节省内存,运行效率也好很多。我们也完全可以基于之前的思想,在pop时如果队列为空则返回空指针,这样比抛出异常更有好一些

1
2
3
4
5
6
7
8
9
10
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
//②试图弹出前检查是否为空栈
if (data.empty()) return nullptr;
//③改动栈容器前设置返回值
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}

死锁怎么造成的

死锁一般是由于调运顺序不一致导致的,比如两个线程循环调用。当线程1先加锁A,再加锁B,而线程2先加锁B,再加锁A。那么在某一时刻就可能造成死锁。比如线程1对A已经加锁,线程2对B已经加锁,那么他们都希望彼此占有对方的锁,又不释放自己占有的锁导致了死锁。
举个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
std::mutex  t_lock1;
std::mutex t_lock2;
int m_1 = 0;
int m_2 = 1;

void dead_lock1() {
while (true) {
std::cout << "dead_lock1 begin " << std::endl;
t_lock1.lock();
m_1 = 1024;
t_lock2.lock();
m_2 = 2048;
t_lock2.unlock();
t_lock1.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
std::cout << "dead_lock2 end " << std::endl;
}
}

void dead_lock2() {
while (true) {
std::cout << "dead_lock2 begin " << std::endl;
t_lock2.lock();
m_2 = 2048;
t_lock1.lock();
m_1 = 1024;
t_lock1.unlock();
t_lock2.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
std::cout << "dead_lock2 end " << std::endl;
}
}

然后我们启动两个线程

1
2
3
4
5
6
void test_dead_lock() {
std::thread t1(dead_lock1);
std::thread t2(dead_lock2);
t1.join();
t2.join();
}

这样运行之后在某一个时刻一定会导致死锁。
实际工作中避免死锁的一个方式就是将加锁和解锁的功能封装为独立的函数,
这样能保证在独立的函数里执行完操作后就解锁,不会导致一个函数里使用多个锁的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//加锁和解锁作为原子操作解耦合,各自只管理自己的功能
void atomic_lock1() {
std::cout << "lock1 begin lock" << std::endl;
t_lock1.lock();
m_1 = 1024;
t_lock1.unlock();
std::cout << "lock1 end lock" << std::endl;
}

void atomic_lock2() {
std::cout << "lock2 begin lock" << std::endl;
t_lock2.lock();
m_2 = 2048;
t_lock2.unlock();
std::cout << "lock2 end lock" << std::endl;
}

void safe_lock1() {
while (true) {
atomic_lock1();
atomic_lock2();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}

void safe_lock2() {
while (true) {
atomic_lock2();
atomic_lock1();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}

void test_safe_lock() {
std::thread t1(safe_lock1);
std::thread t2(safe_lock2);
t1.join();
t2.join();
}

同时加锁

当我们无法避免在一个函数内部使用两个互斥量,并且都要解锁的情况,那我们可以采取同时加锁的方式。我们先定义一个类,假设这个类不推荐拷贝构造,但我们也提供了这个类的拷贝构造和移动构造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class som_big_object {
public:
som_big_object(int data) :_data(data) {}
//拷贝构造
som_big_object(const som_big_object& b2) :_data(b2._data) {
_data = b2._data;
}
//移动构造
som_big_object(som_big_object&& b2) :_data(std::move(b2._data)) {

}
//重载输出运算符
friend std::ostream& operator << (std::ostream& os, const som_big_object& big_obj) {
os << big_obj._data;
return os;
}

//重载赋值运算符
som_big_object& operator = (const som_big_object& b2) {
if (this == &b2) {
return *this;
}
_data = b2._data;
return *this;
}

//交换数据
friend void swap(som_big_object& b1, som_big_object& b2) {
som_big_object temp = std::move(b1);
b1 = std::move(b2);
b2 = std::move(temp);
}
private:
int _data;
};

接下来我们定义一个类对上面的类做管理,为防止多线程情况下数据混乱, 包含了一个互斥量。

1
2
3
4
5
6
7
8
9
10
11
12
13
class big_object_mgr {
public:
big_object_mgr(int data = 0) :_obj(data) {}
void printinfo() {
std::cout << "current obj data is " << _obj << std::endl;
}
friend void danger_swap(big_object_mgr& objm1, big_object_mgr& objm2);
friend void safe_swap(big_object_mgr& objm1, big_object_mgr& objm2);
friend void safe_swap_scope(big_object_mgr& objm1, big_object_mgr& objm2);
private:
std::mutex _mtx;
som_big_object _obj;
};

为了方便演示哪些交换是安全的,哪些是危险的,所以写了三个函数。

1
2
3
4
5
6
7
8
9
10
11
12
void danger_swap(big_object_mgr& objm1, big_object_mgr& objm2) {
std::cout << "thread [ " << std::this_thread::get_id() << " ] begin" << std::endl;
if (&objm1 == &objm2) {
return;
}
std::lock_guard <std::mutex> gurad1(objm1._mtx);
//此处为了故意制造死锁,我们让线程小睡一会
std::this_thread::sleep_for(std::chrono::seconds(1));
std::lock_guard<std::mutex> guard2(objm2._mtx);
swap(objm1._obj, objm2._obj);
std::cout << "thread [ " << std::this_thread::get_id() << " ] end" << std::endl;
}

danger_swap是危险的交换方式。比如如下调用

1
2
3
4
5
6
7
8
9
10
11
12
void  test_danger_swap() {
big_object_mgr objm1(5);
big_object_mgr objm2(100);

std::thread t1(danger_swap, std::ref(objm1), std::ref(objm2));
std::thread t2(danger_swap, std::ref(objm2), std::ref(objm1));
t1.join();
t2.join();

objm1.printinfo();
objm2.printinfo();
}

这种调用方式存在隐患,因为danger_swap函数在两个线程中使用会造成互相竞争加锁的情况。
那就需要用锁同时锁住两个锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void safe_swap(big_object_mgr& objm1, big_object_mgr& objm2) {
std::cout << "thread [ " << std::this_thread::get_id() << " ] begin" << std::endl;
if (&objm1 == &objm2) {
return;
}

std::lock(objm1._mtx, objm2._mtx);
//领养锁管理它自动释放
std::lock_guard <std::mutex> gurad1(objm1._mtx, std::adopt_lock);

//此处为了故意制造死锁,我们让线程小睡一会
std::this_thread::sleep_for(std::chrono::seconds(1));

std::lock_guard <std::mutex> gurad2(objm2._mtx, std::adopt_lock);

swap(objm1._obj, objm2._obj);
std::cout << "thread [ " << std::this_thread::get_id() << " ] end" << std::endl;
}

比如下面的调用就是合理的

1
2
3
4
5
6
7
8
9
10
11
12
void test_safe_swap() {
big_object_mgr objm1(5);
big_object_mgr objm2(100);

std::thread t1(safe_swap, std::ref(objm1), std::ref(objm2));
std::thread t2(safe_swap, std::ref(objm2), std::ref(objm1));
t1.join();
t2.join();

objm1.printinfo();
objm2.printinfo();
}

当然上面加锁的方式可以简化,C++17 scope_lock可以对多个互斥量同时加锁,并且自动释放

1
2
3
4
5
6
7
8
9
10
11
12
13
//上述代码可以简化为以下方式
void safe_swap_scope(big_object_mgr& objm1, big_object_mgr& objm2) {
std::cout << "thread [ " << std::this_thread::get_id() << " ] begin" << std::endl;
if (&objm1 == &objm2) {
return;
}

std::scoped_lock guard(objm1._mtx, objm2._mtx);
//等价于
//std::scoped_lock<std::mutex, std::mutex> guard(objm1._mtx, objm2._mtx);
swap(objm1._obj, objm2._obj);
std::cout << "thread [ " << std::this_thread::get_id() << " ] end" << std::endl;
}

层级锁

现实开发中常常很难规避同一个函数内部加多个锁的情况,我们要尽可能避免循环加锁,所以可以自定义一个层级锁,保证实际项目中对多个互斥量加锁时是有序的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//层级锁
class hierarchical_mutex {
public:
explicit hierarchical_mutex(unsigned long value) :_hierarchy_value(value),
_previous_hierarchy_value(0) {}
hierarchical_mutex(const hierarchical_mutex&) = delete;
hierarchical_mutex& operator=(const hierarchical_mutex&) = delete;
void lock() {
check_for_hierarchy_violation();
_internal_mutex.lock();
update_hierarchy_value();
}

void unlock() {
if (_this_thread_hierarchy_value != _hierarchy_value) {
throw std::logic_error("mutex hierarchy violated");
}

_this_thread_hierarchy_value = _previous_hierarchy_value;
_internal_mutex.unlock();
}

bool try_lock() {
check_for_hierarchy_violation();
if (!_internal_mutex.try_lock()) {
return false;
}

update_hierarchy_value();
return true;
}
private:
std::mutex _internal_mutex;
//当前层级值
unsigned long const _hierarchy_value;
//上一次层级值
unsigned long _previous_hierarchy_value;
//本线程记录的层级值
static thread_local unsigned long _this_thread_hierarchy_value;

void check_for_hierarchy_violation() {
if (_this_thread_hierarchy_value <= _hierarchy_value) {
throw std::logic_error("mutex hierarchy violated");
}
}

void update_hierarchy_value() {
_previous_hierarchy_value = _this_thread_hierarchy_value;
_this_thread_hierarchy_value = _hierarchy_value;
}
};

thread_local unsigned long hierarchical_mutex::_this_thread_hierarchy_value(ULONG_MAX);

void test_hierarchy_lock() {
hierarchical_mutex hmtx1(1000);
hierarchical_mutex hmtx2(500);
std::thread t1([&hmtx1, &hmtx2]() {
hmtx1.lock();
hmtx2.lock();
hmtx2.unlock();
hmtx1.unlock();
});

std::thread t2([&hmtx1, &hmtx2]() {
hmtx2.lock();
hmtx1.lock();
hmtx1.unlock();
hmtx2.unlock();
});

t1.join();
t2.join();
}

层级锁能保证我们每个线程加锁时,一定是先加权重高的锁。
并且释放时也保证了顺序。
主要原理就是将当前锁的权重保存在线程变量中,这样该线程再次加锁时判断线程变量的权重和锁的权重是否大于,如果满足条件则继续加锁。

总结

本文介绍了线程互斥的常见问题和基本处理方案,在实际开发中,根据具体情境具体分析。

视频链接https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

源码链接https://gitee.com/secondtonone1/boostasio-learn

C++ 并发(2) 线程管控

Posted on 2023-08-12 | In C++

简介

本节介绍C++线程管控,包括移交线程的归属权,线程并发数量控制以及获取线程id等基本操作。

线程归属权

我们之前介绍了线程可以通过detach在后台运行或者让开辟这个线程的父线程等待该线程完成。
但每个线程都应该有其归属权,也就是归属给某个变量管理。比如

1
2
3
4
5
void some_function() {

}

std::thread t1(some_function);

t1是一个线程变量,管理一个线程,该线程执行some_function()
对于std::thread C++ 不允许其执行拷贝构造和拷贝赋值, 所以只能通过移动和局部变量返回的方式将线程变量管理的线程转移给其他变量管理。
C++ 中类似的类型还有std::mutex, std::ifstream, std::unique_ptr。
比如下面,我们说明了线程归属权的转移方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void some_function() {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}

void some_other_function() {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}

//t1 绑定some_function
std::thread t1(some_function);
//2 转移t1管理的线程给t2,转移后t1无效
std::thread t2 = std::move(t1);
//3 t1 可继续绑定其他线程,执行some_other_function
t1 = std::thread(some_other_function);
//4 创建一个线程变量t3
std::thread t3;
//5 转移t2管理的线程给t3
t3 = std::move(t2);
//6 转移t3管理的线程给t1
t1 = std::move(t3);
std::this_thread::sleep_for(std::chrono::seconds(2000));

上面的代码会引发崩溃,是因为步骤6造成的崩溃。
让主函数睡眠2000秒,是为了告诉规避主函数退出引发崩溃的问题,因为我们在之前给大家演示过,如果线程不detach或者join,主线程退出时会引发崩溃,而我们这些线程没有join和detach,为了给大家演示是因为步骤6引发的崩溃,所以让主线程睡眠2000秒暂时不退出,但是程序仍然会崩溃,说明是步骤6导致的崩溃。

上面代码将t2管理的线程交给t3
之后将t3管理的线程交给t1,此时t1管理线程运行着 some_function
步骤6导致崩溃的原因就是将t3管理的线程交给t1,而此时t1正在管理线程运行some_function。
所以我们可以得出一个结论,就是不要将一个线程的管理权交给一个已经绑定线程的变量,否则会触发线程的terminate函数引发崩溃。

和std::unique_ptr一样,我们可以在函数内部返回一个局部的std::thread变量。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
std::thread  f() {
return std::thread(some_function);
}

void param_function(int a) {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}

std::thread g() {
std::thread t(param_function, 43);
}

因为C++ 在返回局部变量时,会优先寻找这个类的拷贝构造函数,如果没有就会使用这个类的移动构造函数。

joining_thread

曾经有一份C++17标准的备选提案,主张引入新的类joining_thread,它与std::thread类似,但只要其执行析构函数,线程即能自动汇合,这点与scoped_thread非常像。可惜C++标准委员会未能达成共识,结果C++17标准没有引入这个类,后来它改名为std::jthread,依然进入了C++20标准的议程(现已被正式纳入C++20标准)。除去这些,实际上joining_thread类的代码相对容易编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class joining_thread {
std::thread _t;
public:
joining_thread() noexcept = default;
template<typename Callable, typename ... Args>
explicit joining_thread(Callable&& func, Args&& ...args):
t(std::forward<Callable>(func), std::forward<Args>(args)...){}
explicit joining_thread(std::thread t) noexcept: _t(std::move(t)){}
joining_thread(joining_thread&& other) noexcept: _t(std::move(other._t)){}
joining_thread& operator=(joining_thread&& other) noexcept
{
//如果当前线程可汇合,则汇合等待线程完成再赋值
if (joinable()) {
join();
}
_t = std::move(other._t);
return *this;
}

joining_thread& operator=(joining_thread other) noexcept
{
//如果当前线程可汇合,则汇合等待线程完成再赋值
if (joinable()) {
join();
}
_t = std::move(other._t);
return *this;
}


~joining_thread() noexcept {
if (joinable()) {
join();
}
}

void swap(joining_thread& other) noexcept {
_t.swap(other._t);
}

std::thread::id get_id() const noexcept {
return _t.get_id();
}

bool joinable() const noexcept {
return _t.joinable();
}

void join() {
_t.join();
}

void detach() {
_t.detach();
}

std::thread& as_thread() noexcept {
return _t;
}

const std::thread& as_thread() const noexcept {
return _t;
}
};

使用起来比较简单,我们直接构造一个joining_thread对象即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void use_jointhread() {
//1 根据线程构造函数构造joiningthread
joining_thread j1([](int maxindex) {
for (int i = 0; i < maxindex; i++) {
std::cout << "in thread id " << std::this_thread::get_id()
<< " cur index is " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}, 10);

//2 根据thread构造joiningthread
joining_thread j2(std::thread([](int maxindex) {
for (int i = 0; i < maxindex; i++) {
std::cout << "in thread id " << std::this_thread::get_id()
<< " cur index is " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}, 10));

//3 根据thread构造j3
joining_thread j3(std::thread([](int maxindex) {
for (int i = 0; i < maxindex; i++) {
std::cout << "in thread id " << std::this_thread::get_id()
<< " cur index is " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}, 10));


//4 把j3赋值给j1,joining_thread内部会等待j1汇合结束后
//再将j3赋值给j1
j1 = std::move(j3);
ervice
}

容器存储

容器存储线程时,比如vector,如果用push_back操作势必会调用std::thread,这样会引发编译错误,因为std::thread没有拷贝构造函数。我们在之前网络编程实现IOServicePool或者IOThreadPool时初始化了多个线程存储在vector中, 采用的时emplace方式,可以直接根据线程构造函数需要的参数构造,这样就避免了调用thread的拷贝构造函数。 类似于这种

1
2
3
4
5
6
7
8
9
10
void use_vector() {
std::vector<std::thread> threads;
for (unsigned i = 0; i < 10; ++i) {
threads.emplace_back(param_function, i);
}

for (auto& entry : threads) {
entry.join();
}
}

选择运行数量

借用C++标准库的std::thread::hardware_concurrency()函数,它的返回值是一个指标,表示程序在各次运行中可真正并发的线程数量.
我们可以模拟实现一个并行计算的功能,计算容器内所有元素的和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
unsigned long const length = std::distance(first, last);
if (!length)
return init; //⇽-- - ①
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread - 1) / min_per_thread; //⇽-- - ②
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads); //⇽-- - ③
unsigned long const block_size = length / num_threads; //⇽-- - ④
std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads - 1); // ⇽-- - ⑤
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i)
{
Iterator block_end = block_start;
std::advance(block_end, block_size); //⇽-- - ⑥
threads[i] = std::thread(//⇽-- - ⑦
accumulate_block<Iterator, T>(),
block_start, block_end, std::ref(results[i]));
block_start = block_end; //⇽-- - ⑧
}
accumulate_block<Iterator, T>()(
block_start, last, results[num_threads - 1]); //⇽-- - ⑨

for (auto& entry : threads)
entry.join(); //⇽-- - ⑩
return std::accumulate(results.begin(), results.end(), init); //⇽-- - ⑪
}

void use_parallel_acc() {
std::vector <int> vec(10000);
for (int i = 0; i < 10000; i++) {
vec.push_back(i);
}
int sum = 0;
sum = parallel_accumulate<std::vector<int>::iterator, int>(vec.begin(),
vec.end(), sum);

std::cout << "sum is " << sum << std::endl;
}

上面的代码1处判断要计算的容器内元素为0个则返回。

2处计算最大开辟的线程数,我们预估每个线程计算25个数据长度。

但是我们可以通过std::thread::hardware_concurrency返回cpu的核数,我们期待的是开辟的线

程数小于等于cpu核数,这样才不会造成线程过多时间片切换开销。

所以3处计算了适合开辟线程数的最小值。

4处计算了步长,根据步长移动迭代器然后开辟线程计算。

5处初始化了线程数-1个大小的vector,因为主线程也参与计算,所以这里-1.

6处移动步长,7处开辟线程,8处更新起始位置。

9处为主线程计算。

10 处让所有线程join

11 处最后将所有计算结果再次调用std的accumulate算出结果。

识别线程

所谓识别线程就是获取线程id,可以根据线程id是否相同判断是否同一个线程。
比如我们启动了一个线程,我们可以通过线程变量的get_id()获取线程id

1
2
3
4
5
std::thread t([](){
std::cout << "thread start" << std::endl;
});

t.get_id();

但是如果我们想在线程的运行函数中区分线程,或者判断哪些是主线程或者子线程,可以通过这总方式

1
2
3
4
5
std::thread t([](){
std::cout << "in thread id " <<
std::this_thread::get_id() << std::endl;
std::cout << "thread start" << std::endl;
});

总结

本文介绍了线程管控相关的知识,包括线程的join,detach,以及识别线程,归属权转移,如何管理等等。

本文介绍了std::thread的基本操作,具体视频可以去B站看看我的C++视频讲解

https://space.bilibili.com/271469206/channel/collectiondetail?sid=313101&ctype=0

代码链接

源码链接https://gitee.com/secondtonone1/boostasio-learn

C++ 并发(1) 线程基础

Posted on 2023-07-31 | In C++

简介

本文主要介绍线程的基本管控,包括线程的发起,等待,异常条件下如何等待以及后台运行等基础操作。

线程发起

线程发起顾名思义就是启动一个线程,C++11标准统一了线程操作,可以在定义一个线程变量后,该变量启动线程执行回调逻辑。如下即可发起一个线程

1
2
3
4
5
6
void thead_work1(std::string str) {
std::cout << "str is " << str << std::endl;
}

//1 通过()初始化并启动一个线程
std::thread t1(thead_work1, hellostr);

线程等待

当我们启动一个线程后,线程可能没有立即执行,如果在局部作用域启动了一个线程,或者main函数中,很可能子线程没运行就被回收了,回收时会调用线程的析构函数,执行terminate操作。所以为了防止主线程退出或者局部作用域结束导致子线程被析构的情况,我们可以通过join,让主线程等待子线程启动运行,子线程运行结束后主线程再运行。

1
2
3
4
5
std::string hellostr = "hello world!";
//1 通过()初始化并启动一个线程
std::thread t1(thead_work1, hellostr);
//2 主线程等待子线程退出
t1.join();

仿函数作为参数

当我们用仿函数作为参数传递给线程时,也可以达到启动线程执行某种操作的含义

1
2
3
4
5
6
class background_task {
public:
void operator()(std::string str) {
std::cout << "str is " << str << std::endl;
}
};

如果采用如下方式启动函数,那一定会报错的。

1
2
std::thread t2(background_task());
t2.join();

因为编译器会将t2当成一个函数对象, 返回一个std::thread类型的值, 函数的参数为一个函数指针,该函数指针返回值为background_task, 参数为void。可以理解为如下

1
"std::thread (*)(background_task (*)())"

修改的方式很简单

1
2
3
4
5
6
7
//可多加一层()
std::thread t2((background_task()));
t2.join();

//可使用{}方式初始化
std::thread t3{ background_task() };
t3.join();

lambda表达式

lambda 表达式也可以作为线程的参数传递给thread

1
2
3
4
5
6

std::thread t4([](std::string str) {
std::cout << "str is " << str << std::endl;
}, hellostr);

t4.join();

线程detach

线程允许采用分离的方式在后台独自运行,C++ concurrent programing书中称其为守护线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

struct func {
int& _i;
func(int & i): _i(i){}
void operator()() {
for (int i = 0; i < 3; i++) {
_i = i;
std::cout << "_i is " << _i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
};

void oops() {

int some_local_state = 0;
func myfunc(some_local_state);
std::thread functhread(myfunc);
//隐患,访问局部变量,局部变量可能会随着}结束而回收或随着主线程退出而回收
functhread.detach();
}

// detach 注意事项
oops();
//防止主线程退出过快,需要停顿一下,让子线程跑起来detach
std::this_thread::sleep_for(std::chrono::seconds(1));

上面的例子存在隐患,因为some_local_state是局部变量, 当oops调用结束后局部变量some_local_state就可能被释放了,而线程还在detach后台运行,容易出现崩溃。
所以当我们在线程中使用局部变量的时候可以采取几个措施解决局部变量的问题

  • 通过智能指针传递参数,因为引用计数会随着赋值增加,可保证局部变量在使用期间不被释放,这也就是我们之前提到的伪闭包策略。
  • 将局部变量的值作为参数传递,这么做需要局部变量有拷贝复制的功能,而且拷贝耗费空间和效率。
  • 将线程运行的方式修改为join,这样能保证局部变量被释放前线程已经运行结束。但是这么做可能会影响运行逻辑。
    比如下面的修改
    1
    2
    3
    4
    5
    6
    7
    8
    9
    void use_join() {
    int some_local_state = 0;
    func myfunc(some_local_state);
    std::thread functhread(myfunc);
    functhread.join();
    }

    // join 用法
    use_join();

    异常处理

当我们启动一个线程后,如果主线程产生崩溃,会导致子线程也会异常退出,就是调用terminate,如果子线程在进行一些重要的操作比如将充值信息入库等,丢失这些信息是很危险的。所以常用的做法是捕获异常,并且在异常情况下保证子线程稳定运行结束后,主线程抛出异常结束运行。如下面的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void catch_exception() {
int some_local_state = 0;
func myfunc(some_local_state);
std::thread functhread{ myfunc };
try {
//本线程做一些事情,可能引发崩溃
std::this_thread::sleep_for(std::chrono::seconds(1));
}catch (std::exception& e) {
functhread.join();
throw;
}

functhread.join();
}

但是用这种方式编码,会显得臃肿,可以采用RAII技术,保证线程对象析构的时候等待线程运行结束,回收资源。如果大家还记得我基于asio实现异步服务时,逻辑处理类LogicSystem的析构函数里等待线程退出。那我们写一个简单的线程守卫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class thread_guard {
private:
std::thread& _t;
public:
explicit thread_guard(std::thread& t):_t(t){}
~thread_guard() {
//join只能调用一次
if (_t.joinable()) {
_t.join();
}
}

thread_guard(thread_guard const&) = delete;
thread_guard& operator=(thread_guard const&) = delete;
};

可以这么使用

1
2
3
4
5
6
7
8
9
10
void auto_guard() {
int some_local_state = 0;
func my_func(some_local_state);
std::thread t(my_func);
thread_guard g(t);
//本线程做一些事情
std::cout << "auto guard finished " << std::endl;
}

auto_guard();

慎用隐式转换

C++中会有一些隐式转换,比如char* 转换为string等。这些隐式转换在线程的调用上可能会造成崩溃问题

1
2
3
4
5
6
7
8
9
10
void danger_oops(int som_param) {
char buffer[1024];
sprintf(buffer, "%i", som_param);
//在线程内部将char const* 转化为std::string
//指针常量 char const* 指针本身不能变
//常量指针 const char * 指向的内容不能变
std::thread t(print_str, 3, buffer);
t.detach();
std::cout << "danger oops finished " << std::endl;
}

当我们定义一个线程变量thread t时,传递给这个线程的参数buffer会被保存到thread的成员变量中。
而在线程对象t内部启动并运行线程时,参数才会被传递给调用函数print_str。
而此时buffer可能随着}运行结束而释放了。
改进的方式很简单,我们将参数传递给thread时显示转换为string就可以了,
这样thread内部保存的是string类型。

1
2
3
4
5
6
void safe_oops(int some_param) {
char buffer[1024];
sprintf(buffer, "%i", some_param);
std::thread t(print_str, 3, std::string(buffer));
t.detach();
}

关于为什么参数会像我说的这样保存和调用,我在之后会按照源码给大家讲一讲。

引用参数

当线程要调用的回调函数参数为引用类型时,需要将参数显示转化为引用对象传递给线程的构造函数,
如果采用如下调用会编译失败

1
2
3
4
5
6
7
8
9
10
11
12

void change_param(int& param) {
param++;
}

void ref_oops(int some_param) {
std::cout << "before change , param is " << some_param << std::endl;
//需使用引用显示转换
std::thread t2(change_param, some_param);
t2.join();
std::cout << "after change , param is " << some_param << std::endl;
}

即使函数change_param的参数为int&类型,我们传递给t2的构造函数为some_param,也不会达到在change_param函数内部修改关联到外部some_param的效果。因为some_param在传递给thread的构造函数后会转变为右值保存,右值传递给一个左值引用会出问题,所以编译出了问题。
改为如下调用就可以了

1
2
3
4
5
6
7
void ref_oops(int some_param) {
std::cout << "before change , param is " << some_param << std::endl;
//需使用引用显示转换
std::thread t2(change_param, std::ref(some_param));
t2.join();
std::cout << "after change , param is " << some_param << std::endl;
}

thread原理

为了详细说明thread参数传递和调用原理,我们看看源码

1
2
3
4
template <class _Fn, class... _Args, enable_if_t<!is_same_v<_Remove_cvref_t<_Fn>, thread>, int> = 0>
_NODISCARD_CTOR explicit thread(_Fn&& _Fx, _Args&&... _Ax) {
_Start(_STD forward<_Fn>(_Fx), _STD forward<_Args>(_Ax)...);
}

thread构造函数内部通过forward原样转换传递给_Start函数。关于原样转换的知识可以看我之前写的文章。
_Start 函数内部就是启动了一个线程_beginthreadex执行回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
template <class _Fn, class... _Args>
void _Start(_Fn&& _Fx, _Args&&... _Ax) {
using _Tuple = tuple<decay_t<_Fn>, decay_t<_Args>...>;
auto _Decay_copied = _STD make_unique<_Tuple>(_STD forward<_Fn>(_Fx), _STD forward<_Args>(_Ax)...);
constexpr auto _Invoker_proc = _Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(_Args)>{});

#pragma warning(push)
#pragma warning(disable : 5039) // pointer or reference to potentially throwing function passed to
// extern C function under -EHc. Undefined behavior may occur
// if this function throws an exception. (/Wall)
_Thr._Hnd =
reinterpret_cast<void*>(_CSTD _beginthreadex(nullptr, 0, _Invoker_proc, _Decay_copied.get(), 0, &_Thr._Id));
#pragma warning(pop)

if (_Thr._Hnd) { // ownership transferred to the thread
(void) _Decay_copied.release();
} else { // failed to start thread
_Thr._Id = 0;
_Throw_Cpp_error(_RESOURCE_UNAVAILABLE_TRY_AGAIN);
}
}

我们对应ref_oops内部函数的调用,_Start的参数就是void _Start(change_param&& _Fx, int&& Ax)

_beginthreadex函数参数分别是安全参数,栈大小,调用函数地址,调用函数参数,初始标记,第三方参数地址。
我们关注_beginthreadex的调用函数和参数,调用函数为_Invoker_proc,参数为_Decay_copied
我们看看这两个变量的定义

1
2
auto _Decay_copied           = _STD make_unique<_Tuple>(_STD forward<_Fn>(_Fx), _STD forward<_Args>
constexpr auto _Invoker_proc = _Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(_Args)>{});

_Decay_copied 可以理解为

1
auto _Decay_copied           = _STD make_unique<_Tuple>(_STD forward<change_param>(_Fx), _STD forward<int>

_Invoker_proc 可以理解为封装的可调用对象

1
constexpr auto _Invoker_proc = _Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(int)>{});

我们做一个简化

1
constexpr auto _Invoker_proc = _Get_invoke<_Tuple>(make_index_sequence<2>{});

其实就是将参数的索引0,1按照序列传递给_Get_invoke
_Get_invoke原型为

1
2
3
_NODISCARD static constexpr auto _Get_invoke(index_sequence<_Indices...>) noexcept {
return &_Invoke<_Tuple, _Indices...>;
}

所以_Get_invoke函数原型为

1
2
3
4
5
6
7
8
9
template <class _Tuple, size_t... _Indices>
static unsigned int __stdcall _Invoke(void* _RawVals) noexcept /* terminates */ {
// adapt invoke of user's callable object to _beginthreadex's thread procedure
const unique_ptr<_Tuple> _FnVals(static_cast<_Tuple*>(_RawVals));
_Tuple& _Tup = *_FnVals;
_STD invoke(_STD move(_STD get<_Indices>(_Tup))...);
_Cnd_do_broadcast_at_thread_exit(); // TRANSITION, ABI
return 0;
}

所以我们可以理解为调用_Get_invoke就是调用invoke(_STD move(_STD get<_Indices>(_Tup))...);
invoke(_STD move(_STD get<_Indices>(_Tup))...);就是将回调函数和参数传递给invoke

1
2
3
CONSTEXPR17 auto invoke(_Callable&& _Obj, _Ty1&& _Arg1, _Types2&&... _Args2) noexcept(
noexcept(_Invoker1<_Callable, _Ty1>::_Call(
static_cast<_Callable&&>(_Obj), static_cast<_Ty1&&>(_Arg1), static_cast<_Types2&&>(_Args2)...)))

invoke实际就是调用了_Call函数,_Call的作用就是调用回调函数,并传递给回调函数参数,可以理解为向change_param
传递int类型的右值数据

1
change_param(int&& _Arg1)

这与change_param的定义不符合,change_param参数为左值引用, 不能绑定右值,也就是编译错误的原因。

所以在这里大家就理解了传递给thread 的参数都是按照右值的方式构造为Tuple类型,传递给系统级别函数_beginthreadex调用的。
那为什么使用std::ref就可以实现引用效果呢?
这里我们看下std::ref的源码

1
2
3
4
template <class _Ty>
_NODISCARD _CONSTEXPR20 reference_wrapper<_Ty> ref(_Ty& _Val) noexcept {
return reference_wrapper<_Ty>(_Val);
}

reference_wrapper是一个类类型,说白了就是将参数的地址和类型保存起来。

1
2
3
4
_CONSTEXPR20 reference_wrapper(_Uty&& _Val) noexcept(noexcept(_Refwrap_ctor_fun<_Ty>(_STD declval<_Uty>()))) {
_Ty& _Ref = static_cast<_Uty&&>(_Val);
_Ptr = _STD addressof(_Ref);
}

当我们要使用这个类对象时,自动转化为取内部参数的地址里的数据即可,就达到了和实参关联的效果

1
2
3
4
5
6
7
 _CONSTEXPR20 operator _Ty&() const noexcept {
return *_Ptr;
}

_NODISCARD _CONSTEXPR20 _Ty& get() const noexcept {
return *_Ptr;
}

所以我们可以这么理解传递给thread对象构造函数的参数,仍然作为右值被保存,如ref(int)实际是作为reference_wrapper(int)对象保存在threa的类成员里。
而调用的时候触发了仿函数()进而获取到外部实参的地址内的数据。

绑定类成员函数

有时候我们需要绑定一个类的成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
class X
{
public:
void do_lengthy_work() {
std::cout << "do_lengthy_work " << std::endl;
}
};

void bind_class_oops() {
X my_x;
std::thread t(&X::do_lengthy_work, &my_x);
t.join();
}

这里大家注意一下,如果thread绑定的回调函数是普通函数,可以在函数前加&或者不加&,因为编译器默认将普通函数名作为函数地址,如下两种写法都正确

1
2
3
4
5
6
7
8
void thead_work1(std::string str) {
std::cout << "str is " << str << std::endl;
}

std::string hellostr = "hello world!";
//两种方式都正确
std::thread t1(thead_work1, hellostr);
std::thread t2(&thead_work1, hellostr);

但是如果是绑定类的成员函数,必须添加&

使用move操作

有时候传递给线程的参数是独占的,所谓独占就是不支持拷贝赋值和构造,但是我们可以通过std::move的方式将参数的所有权转移给线程,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void deal_unique(std::unique_ptr<int> p) {
std::cout << "unique ptr data is " << *p << std::endl;
(*p)++;

std::cout << "after unique ptr data is " << *p << std::endl;
}

void move_oops() {
auto p = std::make_unique<int>(100);
std::thread t(deal_unique, std::move(p));
t.join();
//不能再使用p了,p已经被move废弃
// std::cout << "after unique ptr data is " << *p << std::endl;
}

总结

本文介绍了std::thread的基本操作,具体视频可以去B站看看我的C++视频讲解

https://space.bilibili.com/271469206/channel/collectiondetail?sid=313101&ctype=0

代码链接

源码链接https://gitee.com/secondtonone1/boostasio-learn

Linux C++ 安装和使用grpc和jsoncpp库

Posted on 2023-07-30 | In C++

简介

本文主要教会大家如何在Linux环境搭建C++ 所需的grpc和jsoncpp库,并教会大家如何编写cmake,并配置使用这些库。

解决vim乱码

为了解决Linux环境下打开vim中文乱码的问题
用vim打开用户目录下的vim配置文件

1
vim ~/.vimrc

配置如下

1
2
3
set termencoding=utf-8
set encoding=utf8
set fileencodings=utf8,ucs-bom,gbk,cp936,gb2312,gb18030

配置和使用jsoncpp

如果你是ubuntu系统,可以通过如下命令直接安装

1
可以通过指令apt install  libjsoncpp-dev 安装

但是如果是其他Linux系统最好是手动安装源码包,我的操作都是在ubuntu为基础镜像生成的docker中进行的,如果大家使用的是宿主机,可以直接安装。
推荐源码安装,去github下载

https://github.com/open-source-parsers/jsoncpp/releases

可以选择用wget 命令
我在电脑下好后传到云服务器上
然后在云服务器上copy到docker里, 如果你是在宿主机进行的,可以略去这一步。

1
docker cp /home/ubuntu/download/jsoncpp-1.9.5.tar.gz  cppubuntu:/test

进入容器

1
docker exec -it cppubuntu /bin/bash

接下来解压压缩包,无论docker还是宿主机内,都需执行如下命令

1
tar zxvf ./jsoncpp-1.9.5.tar.gz 

进入到源码目录

1
cd  ./json

创建目录

1
mkdir build

进入目录

1
cd build

执行cmake生成makefile

1
cmake ../

执行make

1
make

执行安装

1
make install

更新库

1
ldconfig

写一个jsoncpp的测试cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include<json/json.h>
#include<iostream>
using namespace std;

int main(int argc, char** argv)
{
Json::Value root;
Json::FastWriter fast;
root["ModuleType"]= Json::Value("你好");
root["ModuleCode"]= Json::Value("22");
root["ModuleDesc"]= Json::Value("33");
root["DateTime"]= Json::Value("44");
root["LogType"]= Json::Value("55");
cout<<fast.write(root)<<endl;
return 0;
}

执行 编译

1
g++ jsontest.cpp  -o jsontest -ljsoncpp

或者写个cmake

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
cmake_minimum_required(VERSION 3.12)
project(jsontest)

# 设置 C++ 标准
set(CMAKE_CXX_STANDARD 17)

# 添加可执行文件和源文件
file(GLOB SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
set(JSONCPP_INC_DIR /usr/local/include)

add_executable(jsontest ${SOURCES})

# 包含头文件路径(包括其他目录)
target_include_directories(jsontest
PRIVATE
${JSONCPP_INC_DIR}
)

# 链接 Boost 库
target_link_libraries(jsontest PRIVATE jsoncpp)

运行 ./jsontest
输出

1
{"DateTime":"44","LogType":"55","ModuleCode":"22","ModuleDesc":"33","ModuleType":"\u4f60\u597d"}

grpc配置和使用

克隆grpc指定分支

1
git clone -b v1.34.0 https://gitee.com/mirrors/grpc-framework.git grpc

进入目录并更新子模块

1
2
cd grpc
git submodule update --init

编译并生成grpc库

1
2
3
4
5
6
7
cd grpc
mkdir build
cd build
// 指定安装路径 /usr/local
cmake -DCMAKE_INSTALL_PREFIX=/usr/local ..
make -j2
sudo make install

测试安装成功与否

编译源代码中的helloworld文件夹下的文件,步骤如下:

1
2
3
4
5
6
7
#进入grpc文件夹下
cd examples/cpp/helloworld
mkdir build
cd build
# 编译
cmake ..
make -j8

编译完成后,分别执行greeter_server和greeter_client即可测试。

项目应用grpc

我们的项目中也用到了grpc, 需要编写一个CMakeLists.txt 配置grpc。
大家可以克隆我的boost项目代码

https://gitee.com/secondtonone1/boostasio-learn

进入day19-Grpc-Server目录,我们编写如下的CMakeLists.txt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
cmake_minimum_required(VERSION 3.1)

project(GrpcServer LANGUAGES CXX)

set(CMAKE_INCLUDE_CURRENT_DIR ON)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

#假设已经安装好grpc了
find_package(Threads REQUIRED)

set(protobuf_MODULE_COMPATIBLE TRUE)
find_package(Protobuf CONFIG REQUIRED)
message(STATUS "Using protobuf ${Protobuf_VERSION}")

set(_PROTOBUF_LIBPROTOBUF protobuf::libprotobuf)
set(_REFLECTION gRPC::grpc++_reflection)


# Find gRPC installation
# Looks for gRPCConfig.cmake file installed by gRPC's cmake installation.
find_package(gRPC CONFIG REQUIRED)
message(STATUS "Using gRPC ${gRPC_VERSION}")

set(_GRPC_GRPCPP gRPC::grpc++)

# 添加可执行文件和源文件
file(GLOB SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
file(GLOB PBSOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cc)


add_executable(GrpcServer ${SOURCES}
${PBSOURCES})

target_link_libraries(GrpcServer
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})

我们新建一个build文件夹

1
mkdir build

进入build文件夹里,执行cmake .., 再执行make即可。
直接执行GrpcServer就可以看到我们的程序跑起来了。

总结

本文介绍了如何使用Linux环境下配置和使用grpc和cppjson库,我将docker打包为镜像提交到了网盘上,大家感兴趣可以下载看看

链接
提取码:5wng

可变参数模板+异步队列实现异步打印功能

Posted on 2023-07-23 | In C++

场景

公司有成熟的日志库,但是打印日志的功能是在调用线程里触发的,简而言之就是哪个线程调用了日志库的打印功能,就在哪个线程里输出,如果打印功能频繁,可能会影响该线程的逻辑处理。目前公司的软件界面显示和计算等逻辑很频繁,考虑用异步的方式实现打印。

Read more »

利用Docker搭建Linux C++ Boost开发环境

Posted on 2023-07-21 | In C++

简介

本文介绍如何使用Docker搭建Linux环境下C++的开发环境。众所周知C++的开发环境分为Windows和Linux两种,在windows配置C++开发环境并实现了Asio服务器的开发,我们也做过QT的配置和使用,前面的教程已经很完善了,接下来介绍如何在Linux系统部署C++开发环境。

Read more »
12…29>

283 posts
14 categories
18 tags
GitHub ZhiHu
© 2023 恋恋风辰 本站总访问量次 | 本站访客数人
Powered by Hexo
|
Theme — NexT.Muse v5.1.3