逻辑层设计和消息完善

简介

本文概述基于boost::asio实现的服务器逻辑层结构,并且完善之前设计的消息结构。因为为了简化粘包处理,我们简化了发送数据的结构,这次我们给出完整的消息设计,以及服务器架构设计。

服务器架构设计

之前我们设计了Session(会话层),并且给大家讲述了Asio底层的通信过程,如下图

https://cdn.llfc.club/1685620269385.jpg

我们接下来要设计的服务器结构是这样的

https://cdn.llfc.club/1685621283099.jpg

消息头完善

我们之前的消息头仅包含数据域的长度,但是要进行逻辑处理,就需要传递一个id字段表示要处理的消息id,当然可以不在包头传id字段,将id序列化到消息体也是可以的,但是我们为了便于处理也便于回调逻辑层对应的函数,最好是将id写入包头。
之前我们设计的消息结构是这样的

https://cdn.llfc.club/1683368829739.jpg

现在将其完善为如下的样子

https://cdn.llfc.club/1683367901552.jpg

为了减少耦合和歧义,我们重新设计消息节点。
MsgNode表示消息节点的基类,头部的消息用这个结构存储。
RecvNode表示接收消息的节点。
SendNode表示发送消息的节点。
我们将上述结构定义在MsgNode.h中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class MsgNode
{
public:
MsgNode(short max_len) :_total_len(max_len), _cur_len(0) {
_data = new char[_total_len + 1]();
_data[_total_len] = '\0';
}

~MsgNode() {
std::cout << "destruct MsgNode" << endl;
delete[] _data;
}

void Clear() {
::memset(_data, 0, _total_len);
_cur_len = 0;
}

short _cur_len;
short _total_len;
char* _data;
};

class RecvNode :public MsgNode {
public:
RecvNode(short max_len, short msg_id);
private:
short _msg_id;
};

class SendNode:public MsgNode {
public:
SendNode(const char* msg,short max_len, short msg_id);
private:
short _msg_id;
};

实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#include "MsgNode.h"
RecvNode::RecvNode(short max_len, short msg_id):MsgNode(max_len),
_msg_id(msg_id){

}


SendNode::SendNode(const char* msg, short max_len, short msg_id):MsgNode(max_len + HEAD_TOTAL_LEN)
, _msg_id(msg_id){
//先发送id, 转为网络字节序
short msg_id_host = boost::asio::detail::socket_ops::host_to_network_short(msg_id);
memcpy(_data, &msg_id_host, HEAD_ID_LEN);
//转为网络字节序
short max_len_host = boost::asio::detail::socket_ops::host_to_network_short(max_len);
memcpy(_data + HEAD_ID_LEN, &max_len_host, HEAD_DATA_LEN);
memcpy(_data + HEAD_ID_LEN + HEAD_DATA_LEN, msg, max_len);
}

SendNode发送节点构造时,先将id转为网络字节序,然后写入_data数据域。
然后将要发送数据的长度转为大端字节序,写入_data数据域,注意要偏移HEAD_ID_LEN长度。
最后将要发送的数据msg写入_data数据域,注意要偏移HEAD_ID_LEN+HEAD_DATA_LEN

Session类改写

因为消息结构改变了,所以我们接收和发送数据的逻辑要做对应的修改,我们先修改Session类中收发消息结构如下

1
2
3
4
5
6
7
std::queue<shared_ptr<MsgNode> > _send_que;
std::mutex _send_lock;
//收到的消息结构
std::shared_ptr<MsgNode> _recv_msg_node;
bool _b_head_parse;
//收到的头部结构
std::shared_ptr<MsgNode> _recv_head_node;

因为头部数据只为4字节,所以我们在Session的构造函数中创建头部节点时选择HEAD_TOTAL_LEN(4字节)大小。

1
2
3
4
5
6
CSession::CSession(boost::asio::io_context& io_context, CServer* server):
_socket(io_context), _server(server), _b_close(false),_b_head_parse(false){
boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
_uuid = boost::uuids::to_string(a_uuid);
_recv_head_node = make_shared<MsgNode>(HEAD_TOTAL_LEN);
}

发送时我们构造发送节点,放到队列中即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void CSession::Send(char* msg, short max_length, short msgid) {
std::lock_guard<std::mutex> lock(_send_lock);
int send_que_size = _send_que.size();
if (send_que_size > MAX_SENDQUE) {
std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;
return;
}

_send_que.push(make_shared<SendNode>(msg, max_length, msgid));
if (send_que_size>0) {
return;
}
auto& msgnode = _send_que.front();
boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));
}

当然我们也实现了一个重载版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void CSession::Send(std::string msg, short msgid) {
std::lock_guard<std::mutex> lock(_send_lock);
int send_que_size = _send_que.size();
if (send_que_size > MAX_SENDQUE) {
std::cout << "session: " << _uuid << " send que fulled, size is " << MAX_SENDQUE << endl;
return;
}

_send_que.push(make_shared<SendNode>(msg.c_str(), msg.length(), msgid));
if (send_que_size > 0) {
return;
}
auto& msgnode = _send_que.front();
boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_total_len),
std::bind(&CSession::HandleWrite, this, std::placeholders::_1, SharedSelf()));
}

在接收数据时我们解析头部也要解析id字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
void CSession::HandleRead(const boost::system::error_code& error, size_t  bytes_transferred, std::shared_ptr<CSession> shared_self){
try {
if (!error) {
//已经移动的字符数
int copy_len = 0;
while (bytes_transferred > 0) {
if (!_b_head_parse) {
//收到的数据不足头部大小
if (bytes_transferred + _recv_head_node->_cur_len < HEAD_TOTAL_LEN) {
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_head_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
//收到的数据比头部多
//头部剩余未复制的长度
int head_remain = HEAD_TOTAL_LEN - _recv_head_node->_cur_len;
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data + copy_len, head_remain);
//更新已处理的data长度和剩余未处理的长度
copy_len += head_remain;
bytes_transferred -= head_remain;
//获取头部MSGID数据
short msg_id = 0;
memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);
//网络字节序转化为本地字节序
msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
std::cout << "msg_id is " << msg_id << endl;
//id非法
if (msg_id > MAX_LENGTH) {
std::cout << "invalid msg_id is " << msg_id << endl;
_server->ClearSession(_uuid);
return;
}
short msg_len = 0;
memcpy(&msg_len, _recv_head_node->_data+HEAD_ID_LEN, HEAD_DATA_LEN);
//网络字节序转化为本地字节序
msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
std::cout << "msg_len is " << msg_len << endl;
//id非法
if (msg_len > MAX_LENGTH) {
std::cout << "invalid data length is " << msg_len << endl;
_server->ClearSession(_uuid);
return;
}

_recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);

//消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
if (bytes_transferred < msg_len) {
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
//头部处理完成
_b_head_parse = true;
return;
}

memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, msg_len);
_recv_msg_node->_cur_len += msg_len;
copy_len += msg_len;
bytes_transferred -= msg_len;
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
//cout << "receive data is " << _recv_msg_node->_data << endl;
//此处可以调用Send发送测试
Json::Reader reader;
Json::Value root;
reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);
std::cout << "recevie msg id is " << root["id"].asInt() << " msg data is "
<< root["data"].asString() << endl;
root["data"] = "server has received msg, msg data is " + root["data"].asString();
std::string return_str = root.toStyledString();
Send(return_str, root["id"].asInt());
//继续轮询剩余未处理数据
_b_head_parse = false;
_recv_head_node->Clear();
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
continue;
}

//已经处理完头部,处理上次未接受完的消息数据
//接收的数据仍不足剩余未处理的
int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;
if (bytes_transferred < remain_msg) {
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
_recv_msg_node->_cur_len += remain_msg;
bytes_transferred -= remain_msg;
copy_len += remain_msg;
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
//cout << "receive data is " << _recv_msg_node->_data << endl;
//此处可以调用Send发送测试
Json::Reader reader;
Json::Value root;
reader.parse(std::string(_recv_msg_node->_data, _recv_msg_node->_total_len), root);
std::cout << "recevie msg id is " << root["id"].asInt() << " msg data is "
<< root["data"].asString() << endl;
root["data"] = "server has received msg, msg data is " + root["data"].asString();
std::string return_str = root.toStyledString();
Send(return_str, root["id"].asInt());
//继续轮询剩余未处理数据
_b_head_parse = false;
_recv_head_node->Clear();
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
continue;
}
}
else {
std::cout << "handle read failed, error is " << error.what() << endl;
Close();
_server->ClearSession(_uuid);
}
}
catch (std::exception& e) {
std::cout << "Exception code is " << e.what() << endl;
}
}

先解析头部id,再解析长度,然后根据id和长度构造消息节点,copy剩下的消息体, 把上面代码中处理消息头的逻辑截取如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
   //获取头部MSGID数据
short msg_id = 0;
memcpy(&msg_id, _recv_head_node->_data, HEAD_ID_LEN);
//网络字节序转化为本地字节序
msg_id = boost::asio::detail::socket_ops::network_to_host_short(msg_id);
std::cout << "msg_id is " << msg_id << endl;
//id非法
if (msg_id > MAX_LENGTH) {
std::cout << "invalid msg_id is " << msg_id << endl;
_server->ClearSession(_uuid);
return;
}

short msg_len = 0;
memcpy(&msg_len, _recv_head_node->_data+HEAD_ID_LEN, HEAD_DATA_LEN);
//网络字节序转化为本地字节序
msg_len = boost::asio::detail::socket_ops::network_to_host_short(msg_len);
std::cout << "msg_len is " << msg_len << endl;
//id非法
if (msg_len > MAX_LENGTH) {
std::cout << "invalid data length is " << msg_len << endl;
_server->ClearSession(_uuid);
return;
}

_recv_msg_node = make_shared<RecvNode>(msg_len, msg_id);

其余的没什么变动。

总结

本文介绍了服务器逻辑和网络层的设计,并且基于这个架构,完善了消息发送结构,下一篇带着大家设计逻辑类和逻辑队列。

源码链接https://gitee.com/secondtonone1/boostasio-learn