封装发送队列

简介

前文介绍了通过智能指针实现伪闭包的方式延长了session的生命周期,而实际使用的服务器并不是应答式,而是全双工通信方式,服务器一直监听写事件,接收对端数据,可随时发送数据给对端,今天介绍如何封装异步的发送接口,因为多次发送时,异步的发送要保证回调触发后再次发送才能确保数据是有序的,这一点我们已经在前文异步发送函数介绍的时候提到了。

Server和Session分离

将Server修改为CServer并分离到CServer.h中,然后将Session修改为CSession分离到CSession.h中。
CSession.h中类的声明如下,和之前的Session内容一样,就是修改了类名,放在CSession.h中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <iostream>
#include <boost/asio.hpp>
#include <map>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
using boost::asio::ip::tcp;
using namespace std;
class CServer;
class CSession :public std::enable_shared_from_this<CSession>
{
public:
CSession(boost::asio::io_context& ioc, CServer* server) :_socket(ioc), _server(server) {
boost::uuids::uuid a_uuid = boost::uuids::random_generator()();
_uuid = boost::uuids::to_string(a_uuid);
}
tcp::socket& Socket() {
return _socket;
}

~CSession() {
std::cout << "session destruct delete this " << this << endl;
}

void Start();
std::string& GetUuid();
private:
void handle_read(const boost::system::error_code& error,
size_t bytes_transferred, shared_ptr<CSession> _self_shared);
void handle_write(const boost::system::error_code& error, shared_ptr<CSession> _self_shared);
tcp::socket _socket;
enum { max_length = 1024 };
char _data[max_length];
CServer* _server;
std::string _uuid;
};

CServer.h中声明如下,内容前文没变化,就是将Server内容写入CServer.h中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <boost/asio.hpp>
#include "CSession.h"
#include <memory.h>
#include <map>
using namespace std;
using boost::asio::ip::tcp;
class CServer
{
public:
CServer(boost::asio::io_context& io_context, short port);
void ClearSession(std::string);
private:
void HandleAccept(shared_ptr<CSession>, const boost::system::error_code & error);
void StartAccept();
boost::asio::io_context &_io_context;
short _port;
tcp::acceptor _acceptor;
std::map<std::string, shared_ptr<CSession>> _sessions;
};

整体目录变为
https://cdn.llfc.club/1682157973797.jpg

数据节点设计

我们设计一个数据节点MsgNode用来存储数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class MsgNode
{
friend class CSession;
public:
MsgNode(char * msg, int max_len) {
_data = new char[max_len];
memcpy(_data, msg, max_len);
}

~MsgNode() {
delete[] _data;
}

private:
int _cur_len;
int _max_len;
char* _data;
};

1  _cur_len表示数据当前已处理的长度(已经发送的数据或者已经接收的数据长度),因为一个数据包存在未发送完或者未接收完的情况。
2  _max_len表示数据的总长度。
3  _data表示数据域,已接收或者已发送的数据都放在此空间内。

封装发送接口

首先在CSession类里新增一个队列存储要发送的数据,因为我们不能保证每次调用发送接口的时候上一次数据已经发送完,就要把要发送的数据放入队列中,通过回调函数不断地发送。而且我们不能保证发送的接口和回调函数的接口在一个线程,所以要增加一个锁保证发送队列安全性。
同时我们新增一个发送接口Send

1
2
3
void Send(char* msg,  int max_length);
std::queue<shared_ptr<MsgNode> > _send_que;
std::mutex _send_lock;

实现发送接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void CSession::Send(char* msg, int max_length) {
bool pending = false;
std::lock_guard<std::mutex> lock(_send_lock);
if (_send_que.size() > 0) {
pending = true;
}
_send_que.push(make_shared<MsgNode>(msg, max_length));
if (pending) {
return;
}

boost::asio::async_write(_socket, boost::asio::buffer(msg, max_length),
std::bind(&CSession::HandleWrite, this, std::placeholders::_1, shared_from_this()));
}

发送接口里判断发送队列是否为空,如果不为空说明有数据未发送完,需要将数据放入队列,然后返回。如果发送队列为空,则说明当前没有未发送完的数据,将要发送的数据放入队列并调用async_write函数发送数据。
回调函数实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void CSession::HandleWrite(const boost::system::error_code& error, shared_ptr<CSession> _self_shared) {
if (!error) {
std::lock_guard<std::mutex> lock(_send_lock);
_send_que.pop();
if (!_send_que.empty()) {
auto &msgnode = _send_que.front();
boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_max_len),
std::bind(&CSession::HandleWrite, this, std::placeholders::_1, _self_shared));
}
}
else {
std::cout << "handle write failed, error is " << error.what() << endl;
_server->ClearSession(_uuid);
}
}

判断发送队列是否为空,为空则发送完,否则不断取出队列数据调用async_write发送,直到队列为空。

修改读回调

因为我们要一直监听对端发送的数据,所以要在每次收到数据后继续绑定监听事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void CSession::HandleRead(const boost::system::error_code& error, size_t  bytes_transferred, shared_ptr<CSession> _self_shared){
if (!error) {
cout << "read data is " << _data << endl;
//发送数据
Send(_data, bytes_transferred);
memset(_data, 0, MAX_LENGTH);
_socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), std::bind(&CSession::HandleRead, this,
std::placeholders::_1, std::placeholders::_2, _self_shared));
}
else {
std::cout << "handle read failed, error is " << error.what() << endl;
_server->ClearSession(_uuid);
}
}

总结

该服务器虽然实现了全双工通信,但是仍存在缺陷,比如粘包问题未处理,下一版本实现粘包处理。
源码链接https://gitee.com/secondtonone1/boostasio-learn