处理粘包问题

粘包问题

今天介绍一下如何处理粘包,粘包问题是服务器收发数据常遇到的一个现象,下面我们介绍一下粘包问题是什么,当客户端发送多个数据包给服务器时,服务器底层的tcp接收缓冲区收到的数据为粘连在一起的,如下图所示

https://cdn.llfc.club/9714C5D1A5B9.png
当客户端发送两个Hello World!给服务器,服务器TCP接收缓冲区接收了两次,一次是Hello World!Hello, 第二次是World!。

粘包原因

因为TCP底层通信是面向字节流的,TCP只保证发送数据的准确性和顺序性,字节流以字节为单位,客户端每次发送N个字节给服务端,N取决于当前客户端的发送缓冲区是否有数据,比如发送缓冲区总大小为10个字节,当前有5个字节数据(上次要发送的数据比如’loveu’)未发送完,那么此时只有5个字节空闲空间,我们调用发送接口发送hello world!其实就是只能发送Hello给服务器,那么服务器一次性读取到的数据就很可能是loveuhello。而剩余的world!只能留给下一次发送,下一次服务器接收到的就是world!
如下图
https://cdn.llfc.club/1682251035380.jpg
这是最好理解的粘包问题的产生原因。还有一些其他的原因比如
1   客户端的发送频率远高于服务器的接收频率,就会导致数据在服务器的tcp接收缓冲区滞留形成粘连,比如客户端1s内连续发送了两个hello world!,服务器过了2s才接收数据,那一次性读出两个hello world!。
2   tcp底层的安全和效率机制不允许字节数特别少的小包发送频率过高,tcp会在底层累计数据长度到一定大小才一起发送,比如连续发送1字节的数据要累计到多个字节才发送,可以了解下tcp底层的Nagle算法。
3   再就是我们提到的最简单的情况,发送端缓冲区有上次未发送完的数据或者接收端的缓冲区里有未取出的数据导致数据粘连。

处理粘包

处理粘包的方式主要采用应用层定义收发包格式的方式,这个过程俗称切包处理,常用的协议被称为tlv协议(消息id+消息长度+消息内容),如下图
https://cdn.llfc.club/1683367901552.jpg
为保证大家容易理解,我们先简化发送的格式,格式变为消息长度+消息内容的方式,之后再完善为tlv格式。
简化后的结构如下图
https://cdn.llfc.club/1683368829739.jpg

完善消息节点

之前我们设计过消息节点的数据结构MsgNode,这里需要完善一下.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class MsgNode
{
friend class CSession;
public:
MsgNode(char * msg, short max_len):_total_len(max_len + HEAD_LENGTH),_cur_len(0){
_data = new char[_total_len+1]();
memcpy(_data, &max_len, HEAD_LENGTH);
memcpy(_data+ HEAD_LENGTH, msg, max_len);
_data[_total_len] = '\0';
}

MsgNode(short max_len):_total_len(max_len),_cur_len(0) {
_data = new char[_total_len +1]();
}

~MsgNode() {
delete[] _data;
}

void Clear() {
::memset(_data, 0, _total_len);
_cur_len = 0;
}
private:
short _cur_len;
short _total_len;
char* _data;
};

1   两个参数的构造函数做了完善,之前的构造函数通过消息首地址和长度构造节点数据,现在需要在构造节点的同时把长度信息也写入节点,该构造函数主要用来发送数据时构造发送信息的节点。
2   一个参数的构造函数为较上次新增的,主要根据消息的长度构造消息节点,该构造函数主要是接收对端数据时构造接收节点调用的。
3   新增一个Clear函数清除消息节点的数据,主要是避免多次构造节点造成开销。

CSession类完善

为能够对收到的数据切包处理,需要定义一个消息接收节点,一个bool类型的变量表示头部是否解析完成,以及将处理好的头部先缓存起来的结构。

1
2
3
4
5
//收到的消息结构
std::shared_ptr<MsgNode> _recv_msg_node;
bool _b_head_parse;
//收到的头部结构
std::shared_ptr<MsgNode> _recv_head_node;

_recv_msg_node用来存储接受的消息体信息
_recv_head_node用来存储接收的头部信息
_b_head_parse表示是否处理完头部信息

同时我们新增一个HEAD_LENGTH变量表示数据包头部的大小,修改原消息最大长度为1024*2

1
2
#define MAX_LENGTH  1024*2
#define HEAD_LENGTH 2

完善接收逻辑

我们需要修改HandleRead函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
void CSession::HandleRead(const boost::system::error_code& error, size_t  bytes_transferred, std::shared_ptr<CSession> shared_self){
if (!error) {
//已经移动的字符数
int copy_len = 0;
while (bytes_transferred>0) {
if (!_b_head_parse) {
//收到的数据不足头部大小
if (bytes_transferred + _recv_head_node->_cur_len < HEAD_LENGTH) {
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data+ copy_len, bytes_transferred);
_recv_head_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
//收到的数据比头部多
//头部剩余未复制的长度
int head_remain = HEAD_LENGTH - _recv_head_node->_cur_len;
memcpy(_recv_head_node->_data + _recv_head_node->_cur_len, _data+copy_len, head_remain);
//更新已处理的data长度和剩余未处理的长度
copy_len += head_remain;
bytes_transferred -= head_remain;
//获取头部数据
short data_len = 0;
memcpy(&data_len, _recv_head_node->_data, HEAD_LENGTH);
cout << "data_len is " << data_len << endl;
//头部长度非法
if (data_len > MAX_LENGTH) {
std::cout << "invalid data length is " << data_len << endl;
_server->ClearSession(_uuid);
return;
}
_recv_msg_node = make_shared<MsgNode>(data_len);

//消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里
if (bytes_transferred < data_len) {
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
//头部处理完成
_b_head_parse = true;
return;
}

memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, data_len);
_recv_msg_node->_cur_len += data_len;
copy_len += data_len;
bytes_transferred -= data_len;
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
cout << "receive data is " << _recv_msg_node->_data << endl;
//此处可以调用Send发送测试
Send(_recv_msg_node->_data, _recv_msg_node->_total_len);
//继续轮询剩余未处理数据
_b_head_parse = false;
_recv_head_node->Clear();
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
continue;
}

//已经处理完头部,处理上次未接受完的消息数据
//接收的数据仍不足剩余未处理的
int remain_msg = _recv_msg_node->_total_len - _recv_msg_node->_cur_len;
if (bytes_transferred < remain_msg) {
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred);
_recv_msg_node->_cur_len += bytes_transferred;
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
memcpy(_recv_msg_node->_data + _recv_msg_node->_cur_len, _data + copy_len, remain_msg);
_recv_msg_node->_cur_len += remain_msg;
bytes_transferred -= remain_msg;
copy_len += remain_msg;
_recv_msg_node->_data[_recv_msg_node->_total_len] = '\0';
cout << "receive data is " << _recv_msg_node->_data << endl;
//此处可以调用Send发送测试
Send(_recv_msg_node->_data, _recv_msg_node->_total_len);
//继续轮询剩余未处理数据
_b_head_parse = false;
_recv_head_node->Clear();
if (bytes_transferred <= 0) {
::memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH),
std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, shared_self));
return;
}
continue;
}
}
else {
std::cout << "handle read failed, error is " << error.what() << endl;
Close();
_server->ClearSession(_uuid);
}
}

1   copy_len记录的是已经处理过数据的长度,因为存在一次接收多个包的情况,所以copy_len用来做已经处理的数据长度的。
2   首先判断_b_head_parse是否为false,如果为false则说明头部未处理,先判断接收的数据是否小于头部, 如果小于头部大小则将接收到的数据放入_recv_head_node节点保存,然后继续调用读取函数监听对端发送数据。否则进入步骤3.
3   如果收到的数据比头部多,可能是多个逻辑包,所以要做切包处理。根据之前保留在_recv_head_node的长度,计算出剩余未取出的头部长度,然后取出剩余的头部长度保存在_recv_head_node节点,然后通过memcpy方式从节点拷贝出数据写入short类型的data_len里,进而获取消息的长度。接下来继续处理包体,也就是消息体,判断接收到的数据未处理部分的长度和总共要接收的数据长度大小,如果小于总共要接受的长度,说明消息体没接收完,则将未处理部分先写入_recv_msg_node里,并且继续监听读事件。否则说明消息体接收完全,进入步骤4
4   将消息体数据接收到_recv_msg_node中,接受完全后返回给对端。当然存在多个逻辑包粘连,此时要判断bytes_transferred是否小于等于0,如果是说明只有一个逻辑包,我们处理完了,继续监听读事件,就直接返回即可。否则说明有多个数据包粘连,就继续执行上述操作。

5   因为存在_b_head_parse为true,也就是包头接收并处理完的情况,但是包体未接受完,再次触发HandleRead,此时要继续处理上次未接受完的消息体,大体逻辑和3,4一样。
以上就是处理粘包的过程,我们绘制流程图更明了一些
https://cdn.llfc.club/1683373951566.jpg

客户端修改

客户端的发送也要遵循先发送数据2个字节的数据长度,再发送数据消息的结构。
接收时也是先接收两个字节数据获取数据长度,再根据长度接收消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
int main()
{
try {
//创建上下文服务
boost::asio::io_context ioc;
//构造endpoint
tcp::endpoint remote_ep(address::from_string("127.0.0.1"), 10086);
tcp::socket sock(ioc);
boost::system::error_code error = boost::asio::error::host_not_found; ;
sock.connect(remote_ep, error);
if (error) {
cout << "connect failed, code is " << error.value() << " error msg is " << error.message();
return 0;
}

std::cout << "Enter message: ";
char request[MAX_LENGTH];
std::cin.getline(request, MAX_LENGTH);
size_t request_length = strlen(request);
char send_data[MAX_LENGTH] = { 0 };
memcpy(send_data, &request_length, 2);
memcpy(send_data + 2, request, request_length);
boost::asio::write(sock, boost::asio::buffer(send_data, request_length+2));

char reply_head[HEAD_LENGTH];
size_t reply_length = boost::asio::read(sock,boost::asio::buffer(reply_head, HEAD_LENGTH));
short msglen = 0;
memcpy(&msglen, reply_head, HEAD_LENGTH);
char msg[MAX_LENGTH] = { 0 };
size_t msg_length = boost::asio::read(sock,boost::asio::buffer(msg, msglen));

std::cout << "Reply is: ";
std::cout.write(msg, msglen) << endl;
std::cout << "Reply len is " << msglen;
std::cout << "\n";
}
catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << endl;
}
return 0;
}

服务器启动后,启动客户端,然后客户端发送Hello World,服务器收到后打印如下
https://cdn.llfc.club/1683458311421.jpg

粘包测试

为了测试粘包,需要制造粘包产生的现象,可以让客户端发送的频率高一些,服务器接收的频率低一些,这样造成前后端收发数据不一致导致多个数据包在服务器tcp缓冲区滞留产生粘包现象。
测试粘包之前,在服务器的CSession类里添加打印二进制数据的函数,便于查看缓冲区的数据

1
2
3
4
5
6
7
8
9
10
11
void CSession::PrintRecvData(char* data, int length) {
stringstream ss;
string result = "0x";
for (int i = 0; i < length; i++) {
string hexstr;
ss << hex << std::setw(2) << std::setfill('0') << int(data[i]) << endl;
ss >> hexstr;
result += hexstr;
}
std::cout << "receive raw data is : " << result << endl;;
}

然后将这个函数放到HandleRead里,每次收到数据就调用这个函数打印接收到的最原始的数据,然后睡眠2秒再进行收发操作,用来延迟接收对端数据制造粘包,之后的逻辑不变

1
2
3
4
5
6
7
void CSession::HandleRead(const boost::system::error_code& error, size_t  bytes_transferred, std::shared_ptr<CSession> shared_self){
if (!error) {
PrintRecvData(_data, bytes_transferred);
std::chrono::milliseconds dura(2000);
std::this_thread::sleep_for(dura);
}
}

修改客户端逻辑,实现收发分离。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
int main()
{
try {
//创建上下文服务
boost::asio::io_context ioc;
//构造endpoint
tcp::endpoint remote_ep(address::from_string("127.0.0.1"), 10086);
tcp::socket sock(ioc);
boost::system::error_code error = boost::asio::error::host_not_found; ;
sock.connect(remote_ep, error);
if (error) {
cout << "connect failed, code is " << error.value() << " error msg is " << error.message();
return 0;
}

thread send_thread([&sock] {
for (;;) {
this_thread::sleep_for(std::chrono::milliseconds(2));
const char* request = "hello world!";
size_t request_length = strlen(request);
char send_data[MAX_LENGTH] = { 0 };
memcpy(send_data, &request_length, 2);
memcpy(send_data + 2, request, request_length);
boost::asio::write(sock, boost::asio::buffer(send_data, request_length + 2));
}
});

thread recv_thread([&sock] {
for (;;) {
this_thread::sleep_for(std::chrono::milliseconds(2));
cout << "begin to receive..." << endl;
char reply_head[HEAD_LENGTH];
size_t reply_length = boost::asio::read(sock, boost::asio::buffer(reply_head, HEAD_LENGTH));
short msglen = 0;
memcpy(&msglen, reply_head, HEAD_LENGTH);
char msg[MAX_LENGTH] = { 0 };
size_t msg_length = boost::asio::read(sock, boost::asio::buffer(msg, msglen));

std::cout << "Reply is: ";
std::cout.write(msg, msglen) << endl;
std::cout << "Reply len is " << msglen;
std::cout << "\n";
}
});

send_thread.join();
recv_thread.join();
}
catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << endl;
}
return 0;
}

再次启动服务器和客户端,看到粘包现象了,我们的服务器也能稳定切割数据包并返回正确的消息给客户端。
可以看到服务器收到了大量数据,然后准确切割返回给了客户端。如下图。
https://cdn.llfc.club/1683460029221.jpg

总结

该服务虽然实现了粘包处理,但是服务器仍存在不足,比如当客户端和服务器处于不同平台时收发数据会出现异常,根本原因是未处理大小端模式的问题,这个留给下节处理。
源码链接https://gitee.com/secondtonone1/boostasio-learn