简单的异步服务器demo

简介

前文已经介绍了异步操作的api,今天写一个简单的异步echo服务器,以应答为主

Session类

Session类主要是处理客户端消息收发的会话类,为了简单起见,我们不考虑粘包问题,也不考虑支持手动调用发送的接口,只以应答的方式发送和接收固定长度(1024字节长度)的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Session
{
public:
Session(boost::asio::io_context& ioc):_socket(ioc){
}
tcp::socket& Socket() {
return _socket;
}

void Start();
private:
void handle_read(const boost::system::error_code & error, size_t bytes_transfered);
void handle_write(const boost::system::error_code& error);
tcp::socket _socket;
enum {max_length = 1024};
char _data[max_length];
};

1   _data用来接收客户端传递的数据
2   _socket为单独处理客户端读写的socket。
3   handle_read和handle_write分别为读回调函数和写回调函数。
接下来我们实现Session类

1
2
3
4
5
6
7
void Session::Start(){
memset(_data, 0, max_length);
_socket.async_read_some(boost::asio::buffer(_data, max_length),
std::bind(&Session::handle_read, this, placeholders::_1,
placeholders::_2)
);
}

在Start方法中我们调用异步读操作,监听对端发送的消息。当对端发送数据后,触发handle_read函数

1
2
3
4
5
6
7
8
9
10
11
void Session::handle_read(const boost::system::error_code& error, size_t bytes_transfered) {

if (!error) {
cout << "server receive data is " << _data << endl;
boost::asio::async_write(_socket, boost::asio::buffer(_data, bytes_transfered),
std::bind(&Session::handle_write, this, placeholders::_1));
}
else {
delete this;
}
}

handle_read函数内将收到的数据发送给对端,当发送完成后触发handle_write回调函数。

1
2
3
4
5
6
7
8
9
10
void Session::handle_write(const boost::system::error_code& error) {
if (!error) {
memset(_data, 0, max_length);
_socket.async_read_some(boost::asio::buffer(_data, max_length), std::bind(&Session::handle_read,
this, placeholders::_1, placeholders::_2));
}
else {
delete this;
}
}

handle_write函数内又一次监听了读事件,如果对端有数据发送过来则触发handle_read,我们再将收到的数据发回去。从而达到应答式服务的效果。

Server类

Server类为服务器接收连接的管理类

1
2
3
4
5
6
7
8
9
class Server {
public:
Server(boost::asio::io_context& ioc, short port);
private:
void start_accept();
void handle_accept(Session* new_session, const boost::system::error_code& error);
boost::asio::io_context& _ioc;
tcp::acceptor _acceptor;
};

start_accept将要接收连接的acceptor绑定到服务上,其内部就是将accpeptor对应的socket描述符绑定到epoll或iocp模型上,实现事件驱动。
handle_accept为新连接到来后触发的回调函数。
下面是具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Server::Server(boost::asio::io_context& ioc, short port) :_ioc(ioc),
_acceptor(ioc, tcp::endpoint(tcp::v4(), port)) {
start_accept();
}

void Server::start_accept() {
Session* new_session = new Session(_ioc);
_acceptor.async_accept(new_session->Socket(),
std::bind(&Server::handle_accept, this, new_session, placeholders::_1));
}
void Server::handle_accept(Session* new_session, const boost::system::error_code& error) {
if (!error) {
new_session->Start();
}
else {
delete new_session;
}

start_accept();
}

客户端

客户端的设计用之前的同步模式即可,客户端不需要异步的方式,因为客户端并不是以并发为主,当然写成异步收发更好一些。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <iostream>
#include <boost/asio.hpp>
using namespace std;
using namespace boost::asio::ip;
const int MAX_LENGTH = 1024;
int main()
{
try {
//创建上下文服务
boost::asio::io_context ioc;
//构造endpoint
tcp::endpoint remote_ep(address::from_string("127.0.0.1"), 10086);
tcp::socket sock(ioc);
boost::system::error_code error = boost::asio::error::host_not_found; ;
sock.connect(remote_ep, error);
if (error) {
cout << "connect failed, code is " << error.value() << " error msg is " << error.message();
return 0;
}

std::cout << "Enter message: ";
char request[MAX_LENGTH];
std::cin.getline(request, MAX_LENGTH);
size_t request_length = strlen(request);
boost::asio::write(sock, boost::asio::buffer(request, request_length));

char reply[MAX_LENGTH];
size_t reply_length = boost::asio::read(sock,
boost::asio::buffer(reply, request_length));
std::cout << "Reply is: ";
std::cout.write(reply, reply_length);
std::cout << "\n";
}
catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << endl;
}
return 0;
}

运行服务器之后再运行客户端,输入字符串后,就可以收到服务器应答的字符串了。

隐患

该demo示例为仿照asio官网编写的,其中存在隐患,就是当服务器即将发送数据前(调用async_write前),此刻客户端中断,服务器此时调用async_write会触发发送回调函数,判断ec为非0进而执行delete this逻辑回收session。但要注意的是客户端关闭后,在tcp层面会触发读就绪事件,服务器会触发读事件回调函数。在读事件回调函数中判断错误码ec为非0,进而再次执行delete操作,从而造成二次析构,这是极度危险的。

总结

本文介绍了异步的应答服务器设计,但是这种服务器并不会在实际生产中使用,主要有两个原因:
1   因为该服务器的发送和接收以应答的方式交互,而并不能做到应用层想随意发送的目的,也就是未做到完全的收发分离(全双工逻辑)。
2   该服务器未处理粘包,序列化,以及逻辑和收发线程解耦等问题。
3   该服务器存在二次析构的风险。
这些问题我们会在接下来的文章中不断完善
源码链接
https://gitee.com/secondtonone1/boostasio-learn