设计思路
文件传输必须满足以下几个条件:
- 限制文件大小(不超过
4G
) - 长连接传输(效率高,支持大文件)
- 客户端和服务器都知道传输进度,以保证支持断点续传(后续实现)
- 先实现服务器单线程处理版本,在实现多线程处理版本
如遇问题可添加我的微信

也可以去我得哔哩哔哩主页查看项目视频详细讲解
B站主页 https://space.bilibili.com/271469206
客户端
客户端还是采用聊天项目客户端封装的TcpClient
, 只是修改了发送逻辑
1 | //发送数据槽函数 |
这里着重叙述以下,发送的格式是id + bodyLength + 文件流数据
其中id 为2字节,bodyLength
为4字节,之后就是传输的文件流
slot_send_msg
是槽函数,和 sig_send_msg
信号连接
1 | //连接 发送数据信号和槽函数 |
客户端在发送数据的时候调用
1 | void TcpClient::sendMsg(quint16 id,QByteArray data) |
客户端在打开文件对话框后选择文件,接下来,点击发送会将文件切分成固定大小的报文发送
1 | void MainWindow::on_uploadBtn_clicked() |
发送时数据字段分别为:
文件
md5
: 以后用来做断点续传校验name
: 文件名seq
: 报文序列号,类似于TCP序列号,自己定义的,服务器根据这个序列号组合数据写入文件。trans_size
: 当前已经传输的大小total_size
: 传输文件的总大小。
客户端需要接受服务器返回的消息更新进度条
1 | //接受服务器发送的信息 |
处理消息更新进度条
1 | void TcpClient::processData() |
单线程逻辑服务器
我们先讲解单线程处理收包逻辑的服务器,以后再给大家将多线程的。
服务器要配合客户端,对报文头部大小做修改
1 | //头部总长度 |
其余逻辑和我们在网络编程中讲的IocontextPool
模型服务器一样
服务器收到报文头后调用LogicSystem
来处理
1 | void CSession::AsyncReadBody(int total_len) |
我们知道LogicSystem
会将消息投递到队列里,然后单线程处理, 服务器LogicSystem
注册上传逻辑
1 | void LogicSystem::RegisterCallBacks() { |
收到上传消息后写入文件。
多线程逻辑服务器
多线程逻辑服务器主要是为了缓解单线程接受数据造成的瓶颈,因为单线程接收数据,就会影响其他线程接收数据,所以考虑引入线程池处理收到的数据。
在多线程编程中我们讲过划分多线程设计的几种思路:
- 按照任务划分,将不同的任务投递给不同的线程
- 按照线程数轮询处理
- 按照递归的方式划分
很明显我们不是做二分查找之类的算法处理,所以不会采用第三种。
现在考虑第二种,如果客户端发送一个很大的文件,客户端将文件切分为几个小份发送,服务器通过iocontext
池接受数据, 将接受的数据投递到线程池。
我们知道线程池处理任务是不分先后顺序的,只要投递到队列中的都会被无序取出处理。
会造成数据包处理的乱序,当然可以最后交给一个线程去组合,统一写入文件,这么做的一个弊端就是如果文件很大,那就要等待完全重组完成才能组合为一个统一的包,如果文件很大,这个时间就会很长,当然也可以暂时缓存这些数据,每次收到后排序组合,比较麻烦。
所以这里推荐按照任务划分。
按照任务划分就是按照不同的客户端做区分,一个客户端传输的数据按照文件名字的hash值划分给不同的线程单独处理,也就是一个线程专门处理对应的hash值的任务,这样既能保证有序,又能保证其他线程可以处理其他任务,也有概率会命中hash同样的值投递给一个队列,但也扩充了并发能力。
因为我们之前的逻辑处理也是单线程,所以考虑在逻辑层这里做一下解耦合,因为这个服务只是用来处理数据接受,不涉及多个连接互相访问。所以可以讲logic线程扩充为多个,按照sessionid
将不同的逻辑分配给不同的线程处理。
多线程处理逻辑
将LogicSystem
中添加多个LogicWorker
用来处理逻辑
1 | typedef function<void(shared_ptr<CSession>, const short &msg_id, const string &msg_data)> FunCallBack; |
实现投递逻辑
1 | LogicSystem::LogicSystem(){ |
每一个LogicWorker都包含一个线程,这样LogicWorker可以在独立的线程里处理任务
1 | class LogicWorker |
LogicWorker启动一个线程处理任务
1 |
|
当然要提前注册好任务
1 | void LogicWorker::RegisterCallBacks() |
处理逻辑
1 | void LogicWorker::task_callback(std::shared_ptr<LogicNode> task) |
比如对于文件上传,ID_UPLOAD_FILE_REQ
就调用对应的回调,在回调函数里我们再次将要处理的任务封装好投递到文件系统
1 | FileSystem::GetInstance()->PostMsgToQue( |
文件系统和逻辑系统类似,包含一堆FileWorker
1 | class FileSystem :public Singleton<FileSystem> |
实现投递逻辑
1 | FileSystem::~FileSystem() |
定义文件任务
1 | class CSession; |
实现文件工作者
1 | class FileWorker |
构造函数启动线程
1 | FileWorker::FileWorker():_b_stop(false) |
析构需等待线程
1 | FileWorker::~FileWorker() |
投递任务
1 | void FileWorker::PostTask(std::shared_ptr<FileTask> task) |
因为线程会触发回调函数保存文件,所以我们实现回调函数
1 | void FileWorker::task_callback(std::shared_ptr<FileTask> task) |
测试效果
源码链接
https://gitee.com/secondtonone1/boostasio-learn/tree/master/network/day26-multithread-res-server