恋恋风辰的个人博客


  • Home

  • Archives

  • Categories

  • Tags

  • Search

C++ 并发(3) 互斥与死锁

Posted on 2023-08-20 | In C++

简介

本文介绍如何使用互斥量保证共享数据的安全,并讲述死锁的相关处理方案。

锁的使用

我们可以通过mutex对共享数据进行夹所,防止多线程访问共享区造成数据不一致问题。如下,我们初始化一个共享变量shared_data,然后定义了一个互斥量std::mutex,接下来启动了两个线程,分别执行use_lock增加数据,和一个lambda表达式减少数据。
结果可以看到两个线程对于共享数据的访问是独占的,单位时间片只有一个线程访问并输出日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
std::mutex  mtx1;
int shared_data = 100;

void use_lock() {
while (true) {
mtx1.lock();
shared_data++;
std::cout << "current thread is " << std::this_thread::get_id() << std::endl;
std::cout << "sharad data is " << shared_data << std::endl;
mtx1.unlock();
std::this_thread::sleep_for(std::chrono::microseconds(10));
}

}

void test_lock() {
std::thread t1(use_lock);

std::thread t2([]() {
while (true) {
mtx1.lock();
shared_data--;
std::cout << "current thread is " << std::this_thread::get_id() << std::endl;
std::cout << "sharad data is " << shared_data << std::endl;
mtx1.unlock();
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
});

t1.join();
t2.join();
}

lock_guard的使用

当然我们可以用lock_guard自动加锁和解锁,比如上面的函数可以等价简化为

1
2
3
4
5
6
7
8
9
10
void use_lock() {
while (true) {
std::lock_guard<std::mutex> lock(mtx1);
shared_data++;
std::cout << "current thread is " << std::this_thread::get_id() << std::endl;
std::cout << "sharad data is " << shared_data << std::endl;
std::this_thread::sleep_for(std::chrono::microseconds(10));
}

}

lock_guard在作用域结束时自动调用其析构函数解锁,这么做的一个好处是简化了一些特殊情况从函数中返回的写法,比如异常或者条件不满足时,函数内部直接return,锁也会自动解开。

如何保证数据安全

有时候我们可以将对共享数据的访问和修改聚合到一个函数,在函数内加锁保证数据的安全性。但是对于读取类型的操作,即使读取函数是线程安全的,但是返回值抛给外边使用,存在不安全性。比如一个栈对象,我们要保证其在多线程访问的时候是安全的,可以在判断栈是否为空,判断操作内部我们可以加锁,但是判断结束后返回值就不在加锁了,就会存在线程安全问题。

比如我定义了如下栈, 对于多线程访问时判断栈是否为空,此后两个线程同时出栈,可能会造成崩溃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
template<typename T>
class threadsafe_stack1
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack1() {}
threadsafe_stack1(const threadsafe_stack1& other)
{
std::lock_guard<std::mutex> lock(other.m);
//①在构造函数的函数体(constructor body)内进行复制操作
data = other.data;
}
threadsafe_stack1& operator=(const threadsafe_stack1&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}

//问题代码
T pop()
{
std::lock_guard<std::mutex> lock(m);
auto element = data.top();
data.pop();
return element;
}

bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};

如下, 线程1和线程2先后判断栈都不为空,之后执行出战操作,会造成崩溃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void test_threadsafe_stack1() {
threadsafe_stack1<int> safe_stack;
safe_stack.push(1);

std::thread t1([&safe_stack]() {
if (!safe_stack.empty()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
safe_stack.pop();
}
});

std::thread t2([&safe_stack]() {
if (!safe_stack.empty()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
safe_stack.pop();
}
});

t1.join();
t2.join();
}

解决这个问题我们可以用抛出异常的方式,比如定义一个空栈的异常

1
2
3
4
struct empty_stack : std::exception
{
const char* what() const throw();
};

然后实现我们的出栈函数

1
2
3
4
5
6
7
8
T pop()
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
auto element = data.top();
data.pop();
return element;
}

这么做就需要在外层使用的时候捕获异常。这是C++ 并发编程中提及的建议。但是我觉得可以在函数pop内部再次判断栈是否为空,若为空则返回一个非法数据,这样比抛出异常好一些。但是如果T是一个复杂类型,我们很难定义一个非法值给外界知晓,这一点可以通过智能指针进行优化。之后我们再介绍更优化的方案,因为现在这个pop函数仍存在问题,比如T是一个vector<int>类型,那么在pop函数内部element就是vector<int>类型,开始element存储了一些int值,程序没问题,函数执行pop操作, 假设此时程序内存暴增,导致当程序使用的内存足够大时,可用的有效空间不够, 函数返回element时,就会就会存在vector做拷贝赋值时造成失败。即使我们捕获异常,释放部分空间但也会导致栈元素已经出栈,数据丢失了。这其实是内存管理不当造成的,但是C++ 并发编程一书中给出了优化方案。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
struct empty_stack : std::exception
{
const char* what() const throw();
};

template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack() {}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
//①在构造函数的函数体(constructor body)内进行复制操作
data = other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
//②试图弹出前检查是否为空栈
if (data.empty()) throw empty_stack();
//③改动栈容器前设置返回值
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
value = data.top();
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};

我们提供了两个版本的pop操作,一个是带引用类型的参数的,一个是直接pop出智能指针类型,这样在pop函数内部减少了数据的拷贝,防止内存溢出,其实这两种做法确实是相比之前直接pop固定类型的值更节省内存,运行效率也好很多。我们也完全可以基于之前的思想,在pop时如果队列为空则返回空指针,这样比抛出异常更有好一些

1
2
3
4
5
6
7
8
9
10
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
//②试图弹出前检查是否为空栈
if (data.empty()) return nullptr;
//③改动栈容器前设置返回值
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}

死锁怎么造成的

死锁一般是由于调运顺序不一致导致的,比如两个线程循环调用。当线程1先加锁A,再加锁B,而线程2先加锁B,再加锁A。那么在某一时刻就可能造成死锁。比如线程1对A已经加锁,线程2对B已经加锁,那么他们都希望彼此占有对方的锁,又不释放自己占有的锁导致了死锁。
举个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
std::mutex  t_lock1;
std::mutex t_lock2;
int m_1 = 0;
int m_2 = 1;

void dead_lock1() {
while (true) {
std::cout << "dead_lock1 begin " << std::endl;
t_lock1.lock();
m_1 = 1024;
t_lock2.lock();
m_2 = 2048;
t_lock2.unlock();
t_lock1.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
std::cout << "dead_lock2 end " << std::endl;
}
}

void dead_lock2() {
while (true) {
std::cout << "dead_lock2 begin " << std::endl;
t_lock2.lock();
m_2 = 2048;
t_lock1.lock();
m_1 = 1024;
t_lock1.unlock();
t_lock2.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
std::cout << "dead_lock2 end " << std::endl;
}
}

然后我们启动两个线程

1
2
3
4
5
6
void test_dead_lock() {
std::thread t1(dead_lock1);
std::thread t2(dead_lock2);
t1.join();
t2.join();
}

这样运行之后在某一个时刻一定会导致死锁。
实际工作中避免死锁的一个方式就是将加锁和解锁的功能封装为独立的函数,
这样能保证在独立的函数里执行完操作后就解锁,不会导致一个函数里使用多个锁的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//加锁和解锁作为原子操作解耦合,各自只管理自己的功能
void atomic_lock1() {
std::cout << "lock1 begin lock" << std::endl;
t_lock1.lock();
m_1 = 1024;
t_lock1.unlock();
std::cout << "lock1 end lock" << std::endl;
}

void atomic_lock2() {
std::cout << "lock2 begin lock" << std::endl;
t_lock2.lock();
m_2 = 2048;
t_lock2.unlock();
std::cout << "lock2 end lock" << std::endl;
}

void safe_lock1() {
while (true) {
atomic_lock1();
atomic_lock2();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}

void safe_lock2() {
while (true) {
atomic_lock2();
atomic_lock1();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}

void test_safe_lock() {
std::thread t1(safe_lock1);
std::thread t2(safe_lock2);
t1.join();
t2.join();
}

同时加锁

当我们无法避免在一个函数内部使用两个互斥量,并且都要解锁的情况,那我们可以采取同时加锁的方式。我们先定义一个类,假设这个类不推荐拷贝构造,但我们也提供了这个类的拷贝构造和移动构造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class som_big_object {
public:
som_big_object(int data) :_data(data) {}
//拷贝构造
som_big_object(const som_big_object& b2) :_data(b2._data) {
_data = b2._data;
}
//移动构造
som_big_object(som_big_object&& b2) :_data(std::move(b2._data)) {

}
//重载输出运算符
friend std::ostream& operator << (std::ostream& os, const som_big_object& big_obj) {
os << big_obj._data;
return os;
}

//重载赋值运算符
som_big_object& operator = (const som_big_object& b2) {
if (this == &b2) {
return *this;
}
_data = b2._data;
return *this;
}

//交换数据
friend void swap(som_big_object& b1, som_big_object& b2) {
som_big_object temp = std::move(b1);
b1 = std::move(b2);
b2 = std::move(temp);
}
private:
int _data;
};

接下来我们定义一个类对上面的类做管理,为防止多线程情况下数据混乱, 包含了一个互斥量。

1
2
3
4
5
6
7
8
9
10
11
12
13
class big_object_mgr {
public:
big_object_mgr(int data = 0) :_obj(data) {}
void printinfo() {
std::cout << "current obj data is " << _obj << std::endl;
}
friend void danger_swap(big_object_mgr& objm1, big_object_mgr& objm2);
friend void safe_swap(big_object_mgr& objm1, big_object_mgr& objm2);
friend void safe_swap_scope(big_object_mgr& objm1, big_object_mgr& objm2);
private:
std::mutex _mtx;
som_big_object _obj;
};

为了方便演示哪些交换是安全的,哪些是危险的,所以写了三个函数。

1
2
3
4
5
6
7
8
9
10
11
12
void danger_swap(big_object_mgr& objm1, big_object_mgr& objm2) {
std::cout << "thread [ " << std::this_thread::get_id() << " ] begin" << std::endl;
if (&objm1 == &objm2) {
return;
}
std::lock_guard <std::mutex> gurad1(objm1._mtx);
//此处为了故意制造死锁,我们让线程小睡一会
std::this_thread::sleep_for(std::chrono::seconds(1));
std::lock_guard<std::mutex> guard2(objm2._mtx);
swap(objm1._obj, objm2._obj);
std::cout << "thread [ " << std::this_thread::get_id() << " ] end" << std::endl;
}

danger_swap是危险的交换方式。比如如下调用

1
2
3
4
5
6
7
8
9
10
11
12
void  test_danger_swap() {
big_object_mgr objm1(5);
big_object_mgr objm2(100);

std::thread t1(danger_swap, std::ref(objm1), std::ref(objm2));
std::thread t2(danger_swap, std::ref(objm2), std::ref(objm1));
t1.join();
t2.join();

objm1.printinfo();
objm2.printinfo();
}

这种调用方式存在隐患,因为danger_swap函数在两个线程中使用会造成互相竞争加锁的情况。
那就需要用锁同时锁住两个锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void safe_swap(big_object_mgr& objm1, big_object_mgr& objm2) {
std::cout << "thread [ " << std::this_thread::get_id() << " ] begin" << std::endl;
if (&objm1 == &objm2) {
return;
}

std::lock(objm1._mtx, objm2._mtx);
//领养锁管理它自动释放
std::lock_guard <std::mutex> gurad1(objm1._mtx, std::adopt_lock);

//此处为了故意制造死锁,我们让线程小睡一会
std::this_thread::sleep_for(std::chrono::seconds(1));

std::lock_guard <std::mutex> gurad2(objm2._mtx, std::adopt_lock);

swap(objm1._obj, objm2._obj);
std::cout << "thread [ " << std::this_thread::get_id() << " ] end" << std::endl;
}

比如下面的调用就是合理的

1
2
3
4
5
6
7
8
9
10
11
12
void test_safe_swap() {
big_object_mgr objm1(5);
big_object_mgr objm2(100);

std::thread t1(safe_swap, std::ref(objm1), std::ref(objm2));
std::thread t2(safe_swap, std::ref(objm2), std::ref(objm1));
t1.join();
t2.join();

objm1.printinfo();
objm2.printinfo();
}

当然上面加锁的方式可以简化,C++17 scope_lock可以对多个互斥量同时加锁,并且自动释放

1
2
3
4
5
6
7
8
9
10
11
12
13
//上述代码可以简化为以下方式
void safe_swap_scope(big_object_mgr& objm1, big_object_mgr& objm2) {
std::cout << "thread [ " << std::this_thread::get_id() << " ] begin" << std::endl;
if (&objm1 == &objm2) {
return;
}

std::scoped_lock guard(objm1._mtx, objm2._mtx);
//等价于
//std::scoped_lock<std::mutex, std::mutex> guard(objm1._mtx, objm2._mtx);
swap(objm1._obj, objm2._obj);
std::cout << "thread [ " << std::this_thread::get_id() << " ] end" << std::endl;
}

层级锁

现实开发中常常很难规避同一个函数内部加多个锁的情况,我们要尽可能避免循环加锁,所以可以自定义一个层级锁,保证实际项目中对多个互斥量加锁时是有序的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//层级锁
class hierarchical_mutex {
public:
explicit hierarchical_mutex(unsigned long value) :_hierarchy_value(value),
_previous_hierarchy_value(0) {}
hierarchical_mutex(const hierarchical_mutex&) = delete;
hierarchical_mutex& operator=(const hierarchical_mutex&) = delete;
void lock() {
check_for_hierarchy_violation();
_internal_mutex.lock();
update_hierarchy_value();
}

void unlock() {
if (_this_thread_hierarchy_value != _hierarchy_value) {
throw std::logic_error("mutex hierarchy violated");
}

_this_thread_hierarchy_value = _previous_hierarchy_value;
_internal_mutex.unlock();
}

bool try_lock() {
check_for_hierarchy_violation();
if (!_internal_mutex.try_lock()) {
return false;
}

update_hierarchy_value();
return true;
}
private:
std::mutex _internal_mutex;
//当前层级值
unsigned long const _hierarchy_value;
//上一次层级值
unsigned long _previous_hierarchy_value;
//本线程记录的层级值
static thread_local unsigned long _this_thread_hierarchy_value;

void check_for_hierarchy_violation() {
if (_this_thread_hierarchy_value <= _hierarchy_value) {
throw std::logic_error("mutex hierarchy violated");
}
}

void update_hierarchy_value() {
_previous_hierarchy_value = _this_thread_hierarchy_value;
_this_thread_hierarchy_value = _hierarchy_value;
}
};

thread_local unsigned long hierarchical_mutex::_this_thread_hierarchy_value(ULONG_MAX);

void test_hierarchy_lock() {
hierarchical_mutex hmtx1(1000);
hierarchical_mutex hmtx2(500);
std::thread t1([&hmtx1, &hmtx2]() {
hmtx1.lock();
hmtx2.lock();
hmtx2.unlock();
hmtx1.unlock();
});

std::thread t2([&hmtx1, &hmtx2]() {
hmtx2.lock();
hmtx1.lock();
hmtx1.unlock();
hmtx2.unlock();
});

t1.join();
t2.join();
}

层级锁能保证我们每个线程加锁时,一定是先加权重高的锁。
并且释放时也保证了顺序。
主要原理就是将当前锁的权重保存在线程变量中,这样该线程再次加锁时判断线程变量的权重和锁的权重是否大于,如果满足条件则继续加锁。

总结

本文介绍了线程互斥的常见问题和基本处理方案,在实际开发中,根据具体情境具体分析。

视频链接https://space.bilibili.com/271469206/channel/collectiondetail?sid=1623290

源码链接https://gitee.com/secondtonone1/boostasio-learn

C++ 并发(2) 线程管控

Posted on 2023-08-12 | In C++

简介

本节介绍C++线程管控,包括移交线程的归属权,线程并发数量控制以及获取线程id等基本操作。

线程归属权

我们之前介绍了线程可以通过detach在后台运行或者让开辟这个线程的父线程等待该线程完成。
但每个线程都应该有其归属权,也就是归属给某个变量管理。比如

1
2
3
4
5
void some_function() {

}

std::thread t1(some_function);

t1是一个线程变量,管理一个线程,该线程执行some_function()
对于std::thread C++ 不允许其执行拷贝构造和拷贝赋值, 所以只能通过移动和局部变量返回的方式将线程变量管理的线程转移给其他变量管理。
C++ 中类似的类型还有std::mutex, std::ifstream, std::unique_ptr。
比如下面,我们说明了线程归属权的转移方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void some_function() {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}

void some_other_function() {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}

//t1 绑定some_function
std::thread t1(some_function);
//2 转移t1管理的线程给t2,转移后t1无效
std::thread t2 = std::move(t1);
//3 t1 可继续绑定其他线程,执行some_other_function
t1 = std::thread(some_other_function);
//4 创建一个线程变量t3
std::thread t3;
//5 转移t2管理的线程给t3
t3 = std::move(t2);
//6 转移t3管理的线程给t1
t1 = std::move(t3);
std::this_thread::sleep_for(std::chrono::seconds(2000));

上面的代码会引发崩溃,是因为步骤6造成的崩溃。
让主函数睡眠2000秒,是为了告诉规避主函数退出引发崩溃的问题,因为我们在之前给大家演示过,如果线程不detach或者join,主线程退出时会引发崩溃,而我们这些线程没有join和detach,为了给大家演示是因为步骤6引发的崩溃,所以让主线程睡眠2000秒暂时不退出,但是程序仍然会崩溃,说明是步骤6导致的崩溃。

上面代码将t2管理的线程交给t3
之后将t3管理的线程交给t1,此时t1管理线程运行着 some_function
步骤6导致崩溃的原因就是将t3管理的线程交给t1,而此时t1正在管理线程运行some_other_function。
所以我们可以得出一个结论,就是不要将一个线程的管理权交给一个已经绑定线程的变量,否则会触发线程的terminate函数引发崩溃。

和std::unique_ptr一样,我们可以在函数内部返回一个局部的std::thread变量。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
std::thread  f() {
return std::thread(some_function);
}

void param_function(int a) {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}

std::thread g() {
std::thread t(param_function, 43);
}

因为C++ 在返回局部变量时,会优先寻找这个类的拷贝构造函数,如果没有就会使用这个类的移动构造函数。

joining_thread

曾经有一份C++17标准的备选提案,主张引入新的类joining_thread,它与std::thread类似,但只要其执行析构函数,线程即能自动汇合,这点与scoped_thread非常像。可惜C++标准委员会未能达成共识,结果C++17标准没有引入这个类,后来它改名为std::jthread,依然进入了C++20标准的议程(现已被正式纳入C++20标准)。除去这些,实际上joining_thread类的代码相对容易编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class joining_thread {
std::thread _t;
public:
joining_thread() noexcept = default;
template<typename Callable, typename ... Args>
explicit joining_thread(Callable&& func, Args&& ...args):
t(std::forward<Callable>(func), std::forward<Args>(args)...){}
explicit joining_thread(std::thread t) noexcept: _t(std::move(t)){}
joining_thread(joining_thread&& other) noexcept: _t(std::move(other._t)){}
joining_thread& operator=(joining_thread&& other) noexcept
{
//如果当前线程可汇合,则汇合等待线程完成再赋值
if (joinable()) {
join();
}
_t = std::move(other._t);
return *this;
}

joining_thread& operator=(joining_thread other) noexcept
{
//如果当前线程可汇合,则汇合等待线程完成再赋值
if (joinable()) {
join();
}
_t = std::move(other._t);
return *this;
}


~joining_thread() noexcept {
if (joinable()) {
join();
}
}

void swap(joining_thread& other) noexcept {
_t.swap(other._t);
}

std::thread::id get_id() const noexcept {
return _t.get_id();
}

bool joinable() const noexcept {
return _t.joinable();
}

void join() {
_t.join();
}

void detach() {
_t.detach();
}

std::thread& as_thread() noexcept {
return _t;
}

const std::thread& as_thread() const noexcept {
return _t;
}
};

使用起来比较简单,我们直接构造一个joining_thread对象即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void use_jointhread() {
//1 根据线程构造函数构造joiningthread
joining_thread j1([](int maxindex) {
for (int i = 0; i < maxindex; i++) {
std::cout << "in thread id " << std::this_thread::get_id()
<< " cur index is " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}, 10);

//2 根据thread构造joiningthread
joining_thread j2(std::thread([](int maxindex) {
for (int i = 0; i < maxindex; i++) {
std::cout << "in thread id " << std::this_thread::get_id()
<< " cur index is " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}, 10));

//3 根据thread构造j3
joining_thread j3(std::thread([](int maxindex) {
for (int i = 0; i < maxindex; i++) {
std::cout << "in thread id " << std::this_thread::get_id()
<< " cur index is " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}, 10));


//4 把j3赋值给j1,joining_thread内部会等待j1汇合结束后
//再将j3赋值给j1
j1 = std::move(j3);
ervice
}

容器存储

容器存储线程时,比如vector,如果用push_back操作势必会调用std::thread,这样会引发编译错误,因为std::thread没有拷贝构造函数。我们在之前网络编程实现IOServicePool或者IOThreadPool时初始化了多个线程存储在vector中, 采用的时emplace方式,可以直接根据线程构造函数需要的参数构造,这样就避免了调用thread的拷贝构造函数。 类似于这种

1
2
3
4
5
6
7
8
9
10
void use_vector() {
std::vector<std::thread> threads;
for (unsigned i = 0; i < 10; ++i) {
threads.emplace_back(param_function, i);
}

for (auto& entry : threads) {
entry.join();
}
}

选择运行数量

借用C++标准库的std::thread::hardware_concurrency()函数,它的返回值是一个指标,表示程序在各次运行中可真正并发的线程数量.
我们可以模拟实现一个并行计算的功能,计算容器内所有元素的和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
unsigned long const length = std::distance(first, last);
if (!length)
return init; //⇽-- - ①
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread - 1) / min_per_thread; //⇽-- - ②
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads); //⇽-- - ③
unsigned long const block_size = length / num_threads; //⇽-- - ④
std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads - 1); // ⇽-- - ⑤
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i)
{
Iterator block_end = block_start;
std::advance(block_end, block_size); //⇽-- - ⑥
threads[i] = std::thread(//⇽-- - ⑦
accumulate_block<Iterator, T>(),
block_start, block_end, std::ref(results[i]));
block_start = block_end; //⇽-- - ⑧
}
accumulate_block<Iterator, T>()(
block_start, last, results[num_threads - 1]); //⇽-- - ⑨

for (auto& entry : threads)
entry.join(); //⇽-- - ⑩
return std::accumulate(results.begin(), results.end(), init); //⇽-- - ⑪
}

void use_parallel_acc() {
std::vector <int> vec(10000);
for (int i = 0; i < 10000; i++) {
vec.push_back(i);
}
int sum = 0;
sum = parallel_accumulate<std::vector<int>::iterator, int>(vec.begin(),
vec.end(), sum);

std::cout << "sum is " << sum << std::endl;
}

上面的代码1处判断要计算的容器内元素为0个则返回。

2处计算最大开辟的线程数,我们预估每个线程计算25个数据长度。

但是我们可以通过std::thread::hardware_concurrency返回cpu的核数,我们期待的是开辟的线

程数小于等于cpu核数,这样才不会造成线程过多时间片切换开销。

所以3处计算了适合开辟线程数的最小值。

4处计算了步长,根据步长移动迭代器然后开辟线程计算。

5处初始化了线程数-1个大小的vector,因为主线程也参与计算,所以这里-1.

6处移动步长,7处开辟线程,8处更新起始位置。

9处为主线程计算。

10 处让所有线程join

11 处最后将所有计算结果再次调用std的accumulate算出结果。

识别线程

所谓识别线程就是获取线程id,可以根据线程id是否相同判断是否同一个线程。
比如我们启动了一个线程,我们可以通过线程变量的get_id()获取线程id

1
2
3
4
5
std::thread t([](){
std::cout << "thread start" << std::endl;
});

t.get_id();

但是如果我们想在线程的运行函数中区分线程,或者判断哪些是主线程或者子线程,可以通过这总方式

1
2
3
4
5
std::thread t([](){
std::cout << "in thread id " <<
std::this_thread::get_id() << std::endl;
std::cout << "thread start" << std::endl;
});

总结

本文介绍了线程管控相关的知识,包括线程的join,detach,以及识别线程,归属权转移,如何管理等等。

本文介绍了std::thread的基本操作,具体视频可以去B站看看我的C++视频讲解

https://space.bilibili.com/271469206/channel/collectiondetail?sid=313101&ctype=0

代码链接

源码链接https://gitee.com/secondtonone1/boostasio-learn

C++ 并发(1) 线程基础

Posted on 2023-07-31 | In C++

简介

本文主要介绍线程的基本管控,包括线程的发起,等待,异常条件下如何等待以及后台运行等基础操作。

线程发起

线程发起顾名思义就是启动一个线程,C++11标准统一了线程操作,可以在定义一个线程变量后,该变量启动线程执行回调逻辑。如下即可发起一个线程

1
2
3
4
5
6
void thead_work1(std::string str) {
std::cout << "str is " << str << std::endl;
}

//1 通过()初始化并启动一个线程
std::thread t1(thead_work1, hellostr);

线程等待

当我们启动一个线程后,线程可能没有立即执行,如果在局部作用域启动了一个线程,或者main函数中,很可能子线程没运行就被回收了,回收时会调用线程的析构函数,执行terminate操作。所以为了防止主线程退出或者局部作用域结束导致子线程被析构的情况,我们可以通过join,让主线程等待子线程启动运行,子线程运行结束后主线程再运行。

1
2
3
4
5
std::string hellostr = "hello world!";
//1 通过()初始化并启动一个线程
std::thread t1(thead_work1, hellostr);
//2 主线程等待子线程退出
t1.join();

仿函数作为参数

当我们用仿函数作为参数传递给线程时,也可以达到启动线程执行某种操作的含义

1
2
3
4
5
6
class background_task {
public:
void operator()(std::string str) {
std::cout << "str is " << str << std::endl;
}
};

如果采用如下方式启动函数,那一定会报错的。

1
2
std::thread t2(background_task());
t2.join();

因为编译器会将t2当成一个函数对象, 返回一个std::thread类型的值, 函数的参数为一个函数指针,该函数指针返回值为background_task, 参数为void。可以理解为如下

1
"std::thread (*)(background_task (*)())"

修改的方式很简单

1
2
3
4
5
6
7
//可多加一层()
std::thread t2((background_task()));
t2.join();

//可使用{}方式初始化
std::thread t3{ background_task() };
t3.join();

lambda表达式

lambda 表达式也可以作为线程的参数传递给thread

1
2
3
4
5
6

std::thread t4([](std::string str) {
std::cout << "str is " << str << std::endl;
}, hellostr);

t4.join();

线程detach

线程允许采用分离的方式在后台独自运行,C++ concurrent programing书中称其为守护线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

struct func {
int& _i;
func(int & i): _i(i){}
void operator()() {
for (int i = 0; i < 3; i++) {
_i = i;
std::cout << "_i is " << _i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
};

void oops() {

int some_local_state = 0;
func myfunc(some_local_state);
std::thread functhread(myfunc);
//隐患,访问局部变量,局部变量可能会随着}结束而回收或随着主线程退出而回收
functhread.detach();
}

// detach 注意事项
oops();
//防止主线程退出过快,需要停顿一下,让子线程跑起来detach
std::this_thread::sleep_for(std::chrono::seconds(1));

上面的例子存在隐患,因为some_local_state是局部变量, 当oops调用结束后局部变量some_local_state就可能被释放了,而线程还在detach后台运行,容易出现崩溃。
所以当我们在线程中使用局部变量的时候可以采取几个措施解决局部变量的问题

  • 通过智能指针传递参数,因为引用计数会随着赋值增加,可保证局部变量在使用期间不被释放,这也就是我们之前提到的伪闭包策略。
  • 将局部变量的值作为参数传递,这么做需要局部变量有拷贝复制的功能,而且拷贝耗费空间和效率。
  • 将线程运行的方式修改为join,这样能保证局部变量被释放前线程已经运行结束。但是这么做可能会影响运行逻辑。
    比如下面的修改
    1
    2
    3
    4
    5
    6
    7
    8
    9
    void use_join() {
    int some_local_state = 0;
    func myfunc(some_local_state);
    std::thread functhread(myfunc);
    functhread.join();
    }

    // join 用法
    use_join();

    异常处理

当我们启动一个线程后,如果主线程产生崩溃,会导致子线程也会异常退出,就是调用terminate,如果子线程在进行一些重要的操作比如将充值信息入库等,丢失这些信息是很危险的。所以常用的做法是捕获异常,并且在异常情况下保证子线程稳定运行结束后,主线程抛出异常结束运行。如下面的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void catch_exception() {
int some_local_state = 0;
func myfunc(some_local_state);
std::thread functhread{ myfunc };
try {
//本线程做一些事情,可能引发崩溃
std::this_thread::sleep_for(std::chrono::seconds(1));
}catch (std::exception& e) {
functhread.join();
throw;
}

functhread.join();
}

但是用这种方式编码,会显得臃肿,可以采用RAII技术,保证线程对象析构的时候等待线程运行结束,回收资源。如果大家还记得我基于asio实现异步服务时,逻辑处理类LogicSystem的析构函数里等待线程退出。那我们写一个简单的线程守卫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class thread_guard {
private:
std::thread& _t;
public:
explicit thread_guard(std::thread& t):_t(t){}
~thread_guard() {
//join只能调用一次
if (_t.joinable()) {
_t.join();
}
}

thread_guard(thread_guard const&) = delete;
thread_guard& operator=(thread_guard const&) = delete;
};

可以这么使用

1
2
3
4
5
6
7
8
9
10
void auto_guard() {
int some_local_state = 0;
func my_func(some_local_state);
std::thread t(my_func);
thread_guard g(t);
//本线程做一些事情
std::cout << "auto guard finished " << std::endl;
}

auto_guard();

慎用隐式转换

C++中会有一些隐式转换,比如char* 转换为string等。这些隐式转换在线程的调用上可能会造成崩溃问题

1
2
3
4
5
6
7
8
9
10
void danger_oops(int som_param) {
char buffer[1024];
sprintf(buffer, "%i", som_param);
//在线程内部将char const* 转化为std::string
//指针常量 char const* 指针本身不能变
//常量指针 const char * 指向的内容不能变
std::thread t(print_str, 3, buffer);
t.detach();
std::cout << "danger oops finished " << std::endl;
}

当我们定义一个线程变量thread t时,传递给这个线程的参数buffer会被保存到thread的成员变量中。
而在线程对象t内部启动并运行线程时,参数才会被传递给调用函数print_str。
而此时buffer可能随着}运行结束而释放了。
改进的方式很简单,我们将参数传递给thread时显示转换为string就可以了,
这样thread内部保存的是string类型。

1
2
3
4
5
6
void safe_oops(int some_param) {
char buffer[1024];
sprintf(buffer, "%i", some_param);
std::thread t(print_str, 3, std::string(buffer));
t.detach();
}

关于为什么参数会像我说的这样保存和调用,我在之后会按照源码给大家讲一讲。

引用参数

当线程要调用的回调函数参数为引用类型时,需要将参数显示转化为引用对象传递给线程的构造函数,
如果采用如下调用会编译失败

1
2
3
4
5
6
7
8
9
10
11
12

void change_param(int& param) {
param++;
}

void ref_oops(int some_param) {
std::cout << "before change , param is " << some_param << std::endl;
//需使用引用显示转换
std::thread t2(change_param, some_param);
t2.join();
std::cout << "after change , param is " << some_param << std::endl;
}

即使函数change_param的参数为int&类型,我们传递给t2的构造函数为some_param,也不会达到在change_param函数内部修改关联到外部some_param的效果。因为some_param在传递给thread的构造函数后会转变为右值保存,右值传递给一个左值引用会出问题,所以编译出了问题。
改为如下调用就可以了

1
2
3
4
5
6
7
void ref_oops(int some_param) {
std::cout << "before change , param is " << some_param << std::endl;
//需使用引用显示转换
std::thread t2(change_param, std::ref(some_param));
t2.join();
std::cout << "after change , param is " << some_param << std::endl;
}

thread原理

为了详细说明thread参数传递和调用原理,我们看看源码

1
2
3
4
template <class _Fn, class... _Args, enable_if_t<!is_same_v<_Remove_cvref_t<_Fn>, thread>, int> = 0>
_NODISCARD_CTOR explicit thread(_Fn&& _Fx, _Args&&... _Ax) {
_Start(_STD forward<_Fn>(_Fx), _STD forward<_Args>(_Ax)...);
}

thread构造函数内部通过forward原样转换传递给_Start函数。关于原样转换的知识可以看我之前写的文章。
_Start 函数内部就是启动了一个线程_beginthreadex执行回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
template <class _Fn, class... _Args>
void _Start(_Fn&& _Fx, _Args&&... _Ax) {
using _Tuple = tuple<decay_t<_Fn>, decay_t<_Args>...>;
auto _Decay_copied = _STD make_unique<_Tuple>(_STD forward<_Fn>(_Fx), _STD forward<_Args>(_Ax)...);
constexpr auto _Invoker_proc = _Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(_Args)>{});

#pragma warning(push)
#pragma warning(disable : 5039) // pointer or reference to potentially throwing function passed to
// extern C function under -EHc. Undefined behavior may occur
// if this function throws an exception. (/Wall)
_Thr._Hnd =
reinterpret_cast<void*>(_CSTD _beginthreadex(nullptr, 0, _Invoker_proc, _Decay_copied.get(), 0, &_Thr._Id));
#pragma warning(pop)

if (_Thr._Hnd) { // ownership transferred to the thread
(void) _Decay_copied.release();
} else { // failed to start thread
_Thr._Id = 0;
_Throw_Cpp_error(_RESOURCE_UNAVAILABLE_TRY_AGAIN);
}
}

我们对应ref_oops内部函数的调用,_Start的参数就是void _Start(change_param&& _Fx, int&& Ax)

_beginthreadex函数参数分别是安全参数,栈大小,调用函数地址,调用函数参数,初始标记,第三方参数地址。
我们关注_beginthreadex的调用函数和参数,调用函数为_Invoker_proc,参数为_Decay_copied
我们看看这两个变量的定义

1
2
auto _Decay_copied           = _STD make_unique<_Tuple>(_STD forward<_Fn>(_Fx), _STD forward<_Args>
constexpr auto _Invoker_proc = _Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(_Args)>{});

_Decay_copied 可以理解为

1
auto _Decay_copied           = _STD make_unique<_Tuple>(_STD forward<change_param>(_Fx), _STD forward<int>

_Invoker_proc 可以理解为封装的可调用对象

1
constexpr auto _Invoker_proc = _Get_invoke<_Tuple>(make_index_sequence<1 + sizeof...(int)>{});

我们做一个简化

1
constexpr auto _Invoker_proc = _Get_invoke<_Tuple>(make_index_sequence<2>{});

其实就是将参数的索引0,1按照序列传递给_Get_invoke
_Get_invoke原型为

1
2
3
_NODISCARD static constexpr auto _Get_invoke(index_sequence<_Indices...>) noexcept {
return &_Invoke<_Tuple, _Indices...>;
}

所以_Get_invoke函数原型为

1
2
3
4
5
6
7
8
9
template <class _Tuple, size_t... _Indices>
static unsigned int __stdcall _Invoke(void* _RawVals) noexcept /* terminates */ {
// adapt invoke of user's callable object to _beginthreadex's thread procedure
const unique_ptr<_Tuple> _FnVals(static_cast<_Tuple*>(_RawVals));
_Tuple& _Tup = *_FnVals;
_STD invoke(_STD move(_STD get<_Indices>(_Tup))...);
_Cnd_do_broadcast_at_thread_exit(); // TRANSITION, ABI
return 0;
}

所以我们可以理解为调用_Get_invoke就是调用invoke(_STD move(_STD get<_Indices>(_Tup))...);
invoke(_STD move(_STD get<_Indices>(_Tup))...);就是将回调函数和参数传递给invoke

1
2
3
CONSTEXPR17 auto invoke(_Callable&& _Obj, _Ty1&& _Arg1, _Types2&&... _Args2) noexcept(
noexcept(_Invoker1<_Callable, _Ty1>::_Call(
static_cast<_Callable&&>(_Obj), static_cast<_Ty1&&>(_Arg1), static_cast<_Types2&&>(_Args2)...)))

invoke实际就是调用了_Call函数,_Call的作用就是调用回调函数,并传递给回调函数参数,可以理解为向change_param
传递int类型的右值数据

1
change_param(int&& _Arg1)

这与change_param的定义不符合,change_param参数为左值引用, 不能绑定右值,也就是编译错误的原因。

所以在这里大家就理解了传递给thread 的参数都是按照右值的方式构造为Tuple类型,传递给系统级别函数_beginthreadex调用的。
那为什么使用std::ref就可以实现引用效果呢?
这里我们看下std::ref的源码

1
2
3
4
template <class _Ty>
_NODISCARD _CONSTEXPR20 reference_wrapper<_Ty> ref(_Ty& _Val) noexcept {
return reference_wrapper<_Ty>(_Val);
}

reference_wrapper是一个类类型,说白了就是将参数的地址和类型保存起来。

1
2
3
4
_CONSTEXPR20 reference_wrapper(_Uty&& _Val) noexcept(noexcept(_Refwrap_ctor_fun<_Ty>(_STD declval<_Uty>()))) {
_Ty& _Ref = static_cast<_Uty&&>(_Val);
_Ptr = _STD addressof(_Ref);
}

当我们要使用这个类对象时,自动转化为取内部参数的地址里的数据即可,就达到了和实参关联的效果

1
2
3
4
5
6
7
 _CONSTEXPR20 operator _Ty&() const noexcept {
return *_Ptr;
}

_NODISCARD _CONSTEXPR20 _Ty& get() const noexcept {
return *_Ptr;
}

所以我们可以这么理解传递给thread对象构造函数的参数,仍然作为右值被保存,如ref(int)实际是作为reference_wrapper(int)对象保存在threa的类成员里。
而调用的时候触发了仿函数()进而获取到外部实参的地址内的数据。

绑定类成员函数

有时候我们需要绑定一个类的成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
class X
{
public:
void do_lengthy_work() {
std::cout << "do_lengthy_work " << std::endl;
}
};

void bind_class_oops() {
X my_x;
std::thread t(&X::do_lengthy_work, &my_x);
t.join();
}

这里大家注意一下,如果thread绑定的回调函数是普通函数,可以在函数前加&或者不加&,因为编译器默认将普通函数名作为函数地址,如下两种写法都正确

1
2
3
4
5
6
7
8
void thead_work1(std::string str) {
std::cout << "str is " << str << std::endl;
}

std::string hellostr = "hello world!";
//两种方式都正确
std::thread t1(thead_work1, hellostr);
std::thread t2(&thead_work1, hellostr);

但是如果是绑定类的成员函数,必须添加&

使用move操作

有时候传递给线程的参数是独占的,所谓独占就是不支持拷贝赋值和构造,但是我们可以通过std::move的方式将参数的所有权转移给线程,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void deal_unique(std::unique_ptr<int> p) {
std::cout << "unique ptr data is " << *p << std::endl;
(*p)++;

std::cout << "after unique ptr data is " << *p << std::endl;
}

void move_oops() {
auto p = std::make_unique<int>(100);
std::thread t(deal_unique, std::move(p));
t.join();
//不能再使用p了,p已经被move废弃
// std::cout << "after unique ptr data is " << *p << std::endl;
}

总结

本文介绍了std::thread的基本操作,具体视频可以去B站看看我的C++视频讲解

https://space.bilibili.com/271469206/channel/collectiondetail?sid=313101&ctype=0

代码链接

源码链接https://gitee.com/secondtonone1/boostasio-learn

Linux C++ 安装和使用grpc和jsoncpp库

Posted on 2023-07-30 | In C++

简介

本文主要教会大家如何在Linux环境搭建C++ 所需的grpc和jsoncpp库,并教会大家如何编写cmake,并配置使用这些库。

解决vim乱码

为了解决Linux环境下打开vim中文乱码的问题
用vim打开用户目录下的vim配置文件

1
vim ~/.vimrc

配置如下

1
2
3
set termencoding=utf-8
set encoding=utf8
set fileencodings=utf8,ucs-bom,gbk,cp936,gb2312,gb18030

配置和使用jsoncpp

如果你是ubuntu系统,可以通过如下命令直接安装

1
可以通过指令apt install  libjsoncpp-dev 安装

但是如果是其他Linux系统最好是手动安装源码包,我的操作都是在ubuntu为基础镜像生成的docker中进行的,如果大家使用的是宿主机,可以直接安装。
推荐源码安装,去github下载

https://github.com/open-source-parsers/jsoncpp/releases

可以选择用wget 命令
我在电脑下好后传到云服务器上
然后在云服务器上copy到docker里, 如果你是在宿主机进行的,可以略去这一步。

1
docker cp /home/ubuntu/download/jsoncpp-1.9.5.tar.gz  cppubuntu:/test

进入容器

1
docker exec -it cppubuntu /bin/bash

接下来解压压缩包,无论docker还是宿主机内,都需执行如下命令

1
tar zxvf ./jsoncpp-1.9.5.tar.gz 

进入到源码目录

1
cd  ./json

创建目录

1
mkdir build

进入目录

1
cd build

执行cmake生成makefile

1
cmake ../

执行make

1
make

执行安装

1
make install

更新库

1
ldconfig

写一个jsoncpp的测试cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include<json/json.h>
#include<iostream>
using namespace std;

int main(int argc, char** argv)
{
Json::Value root;
Json::FastWriter fast;
root["ModuleType"]= Json::Value("你好");
root["ModuleCode"]= Json::Value("22");
root["ModuleDesc"]= Json::Value("33");
root["DateTime"]= Json::Value("44");
root["LogType"]= Json::Value("55");
cout<<fast.write(root)<<endl;
return 0;
}

执行 编译

1
g++ jsontest.cpp  -o jsontest -ljsoncpp

或者写个cmake

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
cmake_minimum_required(VERSION 3.12)
project(jsontest)

# 设置 C++ 标准
set(CMAKE_CXX_STANDARD 17)

# 添加可执行文件和源文件
file(GLOB SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
set(JSONCPP_INC_DIR /usr/local/include)

add_executable(jsontest ${SOURCES})

# 包含头文件路径(包括其他目录)
target_include_directories(jsontest
PRIVATE
${JSONCPP_INC_DIR}
)

# 链接 Boost 库
target_link_libraries(jsontest PRIVATE jsoncpp)

运行 ./jsontest
输出

1
{"DateTime":"44","LogType":"55","ModuleCode":"22","ModuleDesc":"33","ModuleType":"\u4f60\u597d"}

grpc配置和使用

克隆grpc指定分支

1
git clone -b v1.34.0 https://gitee.com/mirrors/grpc-framework.git grpc

进入目录并更新子模块

1
2
cd grpc
git submodule update --init

编译并生成grpc库

1
2
3
4
5
6
7
cd grpc
mkdir build
cd build
// 指定安装路径 /usr/local
cmake -DCMAKE_INSTALL_PREFIX=/usr/local ..
make -j2
sudo make install

测试安装成功与否

编译源代码中的helloworld文件夹下的文件,步骤如下:

1
2
3
4
5
6
7
#进入grpc文件夹下
cd examples/cpp/helloworld
mkdir build
cd build
# 编译
cmake ..
make -j8

编译完成后,分别执行greeter_server和greeter_client即可测试。

项目应用grpc

我们的项目中也用到了grpc, 需要编写一个CMakeLists.txt 配置grpc。
大家可以克隆我的boost项目代码

https://gitee.com/secondtonone1/boostasio-learn

进入day19-Grpc-Server目录,我们编写如下的CMakeLists.txt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
cmake_minimum_required(VERSION 3.1)

project(GrpcServer LANGUAGES CXX)

set(CMAKE_INCLUDE_CURRENT_DIR ON)

set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

#假设已经安装好grpc了
find_package(Threads REQUIRED)

set(protobuf_MODULE_COMPATIBLE TRUE)
find_package(Protobuf CONFIG REQUIRED)
message(STATUS "Using protobuf ${Protobuf_VERSION}")

set(_PROTOBUF_LIBPROTOBUF protobuf::libprotobuf)
set(_REFLECTION gRPC::grpc++_reflection)


# Find gRPC installation
# Looks for gRPCConfig.cmake file installed by gRPC's cmake installation.
find_package(gRPC CONFIG REQUIRED)
message(STATUS "Using gRPC ${gRPC_VERSION}")

set(_GRPC_GRPCPP gRPC::grpc++)

# 添加可执行文件和源文件
file(GLOB SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
file(GLOB PBSOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cc)


add_executable(GrpcServer ${SOURCES}
${PBSOURCES})

target_link_libraries(GrpcServer
${_REFLECTION}
${_GRPC_GRPCPP}
${_PROTOBUF_LIBPROTOBUF})

我们新建一个build文件夹

1
mkdir build

进入build文件夹里,执行cmake .., 再执行make即可。
直接执行GrpcServer就可以看到我们的程序跑起来了。

总结

本文介绍了如何使用Linux环境下配置和使用grpc和cppjson库,我将docker打包为镜像提交到了网盘上,大家感兴趣可以下载看看

链接
提取码:5wng

可变参数模板+异步队列实现异步打印功能

Posted on 2023-07-23 | In C++

场景

公司有成熟的日志库,但是打印日志的功能是在调用线程里触发的,简而言之就是哪个线程调用了日志库的打印功能,就在哪个线程里输出,如果打印功能频繁,可能会影响该线程的逻辑处理。目前公司的软件界面显示和计算等逻辑很频繁,考虑用异步的方式实现打印。

Read more »

利用Docker搭建Linux C++ Boost开发环境

Posted on 2023-07-21 | In C++

简介

本文介绍如何使用Docker搭建Linux环境下C++的开发环境。众所周知C++的开发环境分为Windows和Linux两种,在windows配置C++开发环境并实现了Asio服务器的开发,我们也做过QT的配置和使用,前面的教程已经很完善了,接下来介绍如何在Linux系统部署C++开发环境。

Read more »

网络答疑汇总

Posted on 2023-07-15 | In C++

简介

总结下目前视频教程中读者提出的问题,并逐一回答。视频地址在哔哩哔哩
https://space.bilibili.com/271469206

Read more »

beast网络库实现websocket服务器

Posted on 2023-06-25 | In C++

简介

使用beast网络库实现websocket服务器,一般来说websocket是一个长连接的协议,但是自动包含了解包处理,当我们在浏览器输入一个http请求时如果是以ws开头的如ws://127.0.0.1:9501就是请求本地9501端口的websocket服务器处理。而beast为我们提供了websocket的处理方案,我们可以在http服务器的基础上升级协议为websocket,处理部分websocket请求。如果服务器收到的是普通的http请求则按照http请求处理。我们可以从官方文档中按照示例逐步搭建websocket服务器。

Read more »

beast网络库搭建http服务器

Posted on 2023-06-24 | In C++

简介

前面的几篇文章已经介绍了如何使用asio搭建高并发的tcp服务器,以及http服务器。但是纯手写http服务器太麻烦了,有网络库beast已经帮我们实现了。这一期讲讲如何使用beast实现一个http服务器。

连接类

我们先实现http_server函数

1
2
3
4
5
6
7
8
9
10
void http_server(tcp::acceptor& acceptor, tcp::socket& socket)
{
acceptor.async_accept(socket,
[&](beast::error_code ec)
{
if (!ec)
std::make_shared<http_connection>(std::move(socket))->start();
http_server(acceptor, socket);
});
}
Read more »

asio实现http服务器

Posted on 2023-06-23 | In C++

简介

前文介绍了asio如何实现并发的长连接tcp服务器,今天介绍如何实现http服务器,在介绍实现http服务器之前,需要讲述下http报文头的格式,其实http报文头的格式就是为了避免我们之前提到的粘包现象,告诉服务器一个数据包的开始和结尾,并在包头里标识请求的类型如get或post等信息。

Read more »
<1…91011…37>

370 posts
17 categories
21 tags
RSS
GitHub ZhiHu
© 2025 恋恋风辰 本站总访问量次 | 本站访客数人
Powered by Hexo
|
Theme — NexT.Muse v5.1.3