如何设计一个符合工信部网站备案系统的网页?
摘要:工信部网站备案系统,完整的网页设计代码,怎样创建网站,做电影网站怎么接广告文章目录 一、Buffer模块1.为什么需要Buffer缓冲区2.Buffer模块的设计3.Buffer模块的实现4.Buffer缓冲区的其它设计方案 二、Socke
工信部网站备案系统,完整的网页设计代码,怎样创建网站,做电影网站怎么接广告文章目录 一、Buffer模块1.为什么需要Buffer缓冲区2.Buffer模块的设计3.Buffer模块的实现4.Buffer缓冲区的其它设计方案 二、Socket模块1.Socket模块的设计2.Socket代码实现 三、Acceptor模块1.Acceptor模块的设计与实现2.Acceptor模块完整代码实现 四、定时器模块1.时间轮的思… 文章目录 一、Buffer模块1.为什么需要Buffer缓冲区2.Buffer模块的设计3.Buffer模块的实现4.Buffer缓冲区的其它设计方案 二、Socket模块1.Socket模块的设计2.Socket代码实现 三、Acceptor模块1.Acceptor模块的设计与实现2.Acceptor模块完整代码实现 四、定时器模块1.时间轮的思想2.TimerTask类3.TimerWheel类 五、线程池模块1.LoopThread类2.LoopThreadPool类 一、Buffer模块
1.为什么需要Buffer缓冲区
我们实现的TCP网络服务器必须要有发送缓冲区和接收缓冲区这一点是毋庸置疑的。考虑以下两种情况
如果服务器没有发送缓冲区那么当服务器想向客户端发送假设100KB的数据时调用操作系统的write接口进行发送。但是如果操作系统准确来说是操作系统的缓冲区只接收了80KB我们服务器还有20KB的数据没有发送出去这个时候只能阻塞在write接口处等待最后20KB数据发送出去。如果有了发送缓冲区我们就能将这20KB的数据再放回发送缓冲区等什么时候操作系统缓冲区能接收新数据了再进行第二次发送。如果服务器没有接收缓冲区由于TCP是面向字节流的协议也就是说它的数据报文是没有明显边界的那么就可能会出现一次接收的数据不完整或者数据粘包的问题这时候我们拿到不完整的数据既不能解析也不能丢弃只有接收缓冲区能让我们缓存这些不完整的数据报文等到下一次对方再发送新数据的时候我们读取到完整的报文时再交给上层进行业务处理。
这两个简单常见的例子就足以说明TCP服务器必须要有Buffer缓冲区。
2.Buffer模块的设计
muduo库的作者陈硕老师在《Linux多线程服务端编程使用muduo C网络库》书中提到muduo Buffer的设计考虑了常见的网络编程需求他试图在易用性和性能之间找一个平衡点目前这个平衡点更偏向于易用性。所以muduo库的Buffer设计要点如下
对外表现是一块连续的内存空间这样更方便代码的编写。Buffer缓冲区的空间大小是可变的支持动态扩容以适应不同大小的消息。虽然服务器需要接收缓冲区和发送缓冲区但我们不希望分开两个缓冲区来写而是希望一块内存空间既能读也能写做到读写配合。
从连续的内存空间、支持动态扩容这两个要点来看Buffer缓冲区底层的数据结构应该选择vector最合适。并且还需要两个index分别代表读位置和写位置这样就能满足一块内存空间的读写配合。有个细节的地方需要注意读位置和写位置的index不应该设置为指针类型或者迭代器类型最好使用下标来表示因为vector如果需要扩容将数据拷贝到新空间会出现迭代器失效的情况需要我们去处理迭代器失效从易用性角度考虑使用下标是最好的。
确定了Buffer的数据结构以后我们可以很容易地想象出Buffer的结构图如下图所示 通过起始位置、读位置reader_index、写位置writer_index、末尾位置我们可以将Buffer分成三部分
读位置之前的空间即起始位置和读位置之间的空间我们称为头部空闲空间读位置和写位置之间的空间我们称为有效数据。这部分存放的就是可读数据写位置之后的空间即写位置和末尾位置之间的空间我们称为尾部空闲空间。
划分完成之后Buffer缓冲区的读写数据方式会变得非常清晰。读位置和写位置之间的空间就是有效数据区域也就是可读数据。当我们要从Buffer缓冲区中读取数据时从读位置reader_index开始向后读取指定的字节读取完以后再将读位置reader_index向后移动读取的字节数。写入也是同理当我们要向Buffer缓冲区中写入数据时从写位置writer_index开始向后写入指定的字节写入完以后再将写位置writer_index向后移动写入的字节数。当读位置reader_index和写位置writer_index相遇的时候说明缓冲区已经没有可读数据了。
如果写位置writer_index一直写入到末尾位置尾部空闲空间不够写入怎么办 Buffer模块是支持动态扩容的当我们向Buffer缓冲区写入指定字节的数据时它先会去检查尾部空闲空间能不能容纳这些字节的数据如果不能再将头部空闲空间加入进来判断尾部空闲空间加上头部空闲空间是否能够容纳如果可以的话就将可读数据挪动到起始位置更新读位置reader_index和写位置writer_index后再写入新数据例如下列示意图 如果尾部空闲空间加上头部空闲空间都不够写入那就只能从写位置writer_index开始向后扩容了。需要注意的是扩容之后的Buffer并不会缩小空间大小比如扩容到1000字节的大小下一次写入比1000字节小的数据也不会重新开辟分配空间。也就是说muduo库的Buffer缓冲区空间大小是自适应的比如一开始空间大小是1KB程序经常收发10KB的数据那么用了几次之后它的空间大小会自动增长到10KB然后就保持不变。这样做的好处是一方面避免浪费内存即一开始不会开很大的空间而是不够了再去扩容。另一方面是避免反复分配内存扩容之后申请了多大的内存空间就保持不变了只会扩大不会缩小因为分配内存也是需要时间开销的。
3.Buffer模块的实现
有了上述的设计思路以后我们实现一个Buffer模块并不是什么难事这里直接把代码贴出来了。我写这些文章的目的是记录每个模块的设计思路以及细节学习muduo库设计的高性能高并发版本服务器所以代码实现并不是最重要的思路更重要这些简单代码就不过多介绍浪费篇幅了。
#define BUFFER_DEFAULT_SIZE 1024class Buffer
{
public:Buffer(): _reader_index(0), _writer_index(0), _buffer(BUFFER_DEFAULT_SIZE){}// 获取当前写入起始地址char *getWriteStartPosition(){// buffer空间的起始地址加上写偏移量return begin() _writer_index;}// 获取当前读取起始地址char *getReadStartPosition(){return begin() _reader_index;}// 获取缓冲区末尾空间大小--写偏移之后的空闲空间uint64_t getTailSpaceSize(){// 整体空间大小减去写偏移return _buffer.size() - _writer_index;}// 获取缓冲区起始空间大小--读偏移之前的空闲空间uint64_t getHeadSpaceSize(){return _reader_index;}// 获取可读数据大小uint64_t getReadableSize(){return _writer_index - _reader_index;}// 将读偏移向后移动void moveReadOffset(uint64_t len){if(len 0){return;}assert(len getReadableSize());_reader_index len;}// 将写偏移向后移动void moveWriteOffset(uint64_t len){assert(len getTailSpaceSize());_writer_index len;}// 确保可写空间足够整体空闲空间够了就移动数据否则就扩容void ensureWriteSpace(uint64_t len){// 如果末尾空闲空间大小足够直接返回if (len getTailSpaceSize()){return;}// 如果末尾空闲空间不够则判断加上起始空闲空间大小是否足够够的话就将数据到起始位置else if (len getHeadSpaceSize() getTailSpaceSize()){uint64_t readAbleSize getReadableSize();std::copy(getReadStartPosition(), getReadStartPosition() readAbleSize, begin());_reader_index 0;_writer_index readAbleSize;}else{// 扩容_buffer.resize(_writer_index len);}}// 写入数据void write(const void *data, uint64_t len){// 1.保证有足够空间 2.拷贝数据进行if(len 0){return;}ensureWriteSpace(len);const char *chardata (const char *)data;std::copy(chardata, chardata len, getWriteStartPosition());}void writeAndPush(const void *data, uint64_t len){write(data, len);moveWriteOffset(len);}void writeFromString(const std::string data){write(data.c_str(), data.size());}void writeStringAndPush(const std::string data){writeFromString(data);moveWriteOffset(data.size());}void writeFromBuffer(Buffer data){write(data.getReadStartPosition(), data.getReadableSize());}void writeBufferAndPush(Buffer data){writeFromBuffer(data);moveWriteOffset(data.getReadableSize());}// 读取数据void read(void *buf, uint64_t len){// 要求要获取的数据大小必须小于可读数据大小assert(len getReadableSize());std::copy(getReadStartPosition(), getReadStartPosition() len, (char *)buf);}void readAndPop(void *buf, uint64_t len){read(buf, len);moveReadOffset(len);}std::string readAsString(uint64_t len){assert(len getReadableSize());std::string str;str.resize(len);read(str[0], len);return str;}std::string readAsStringAndPop(uint64_t len){assert(len getReadableSize());std::string str readAsString(len);moveReadOffset(len);return str;}// 寻找换行字符char *findCRLF(){char *findRes (char *)memchr(getReadStartPosition(), \n, getReadableSize());return findRes;}std::string getLine(){char *pos findCRLF();if (pos nullptr){return ;}return readAsString(pos - getReadStartPosition() 1);}std::string getLineAndPop(){std::string str getLine();moveReadOffset(str.size());return str;}// 清空缓冲区void clear(){_reader_index 0;_writer_index 0;}private:char *begin(){return (*_buffer.begin());}private:std::vectorchar _buffer; // 使用vector进行内存空间管理uint64_t _reader_index; // 读偏移uint64_t _writer_index; // 写偏移
};4.Buffer缓冲区的其它设计方案
其实在刚开始学习muduo库的Buffer缓冲区时我很好奇为什么会选择vector这个数据结构作为缓冲区的底层数据结构呢要知道vector的优势在于连续性但是插入和删除的时间复杂度都是比较高的因为vector的插入和删除会涉及到很多数据挪动时间复杂度往往都是O(N^2)级别的。而muduo库不是高性能的网络版本服务器吗怎么还会用这种低效的方式呢我一开始以为会是什么复杂的高深的数据结构来实现将时间复杂度降到最低毕竟服务器不就是追求性能和效率吗
其实这个问题的答案作者在书中已经给我们解答了还记得文章开始我们介绍过作者陈硕老师在《Linux多线程服务端编程使用muduo C网络库》书中提到“muduo Buffer的设计考虑了常见的网络编程需求试图在易用性和性能之间找一个平衡点目前这个平衡点更偏向于易用性”。所以选择vector作为Buffer的存储结构是因为偏向易用性为了让代码实现以及muduo库组件的使用都更简单方便。
作者在书中也提供了其它几种设计方案感兴趣的读者可以去翻阅一下这本书。其中最高效的方案应该是zero copy方案该方案是实现分段连续的zero copy buffer。这个方案虽然使得服务器性能更好更高效但代价就是代码实现变得复杂晦涩难懂。并且由于Buffer缓冲区不是连续的parse消息会变得比较麻烦。
那么为什么muduo库的Buffer选择偏向易用性而不是偏向高性能呢 其实我们都可以发现muduo库的Buffer模块有很多可以优化从而提高效率的地方。那么我们可能会怀疑muduo库的性能会不会太低了作者给出的解释是“可以优化但不一定值得优化”。
目前最常用的千兆以太网的吞吐量基本都是几百兆每秒而现在服务器上最常用的DDR2/DDR3内存的带宽至少是4GB/s比千兆以太网高40倍以上。也就是说对于Buffer缓冲区中几KB或几十KB大小的数据在内存中复制几次根本不是问题因为受千兆以太网延迟和带宽的限制跟这个程序通信的其他机器上的程序不会察觉到性能差异。
再比如说如果我们的服务器上层应用需要和数据库打交道那么性能瓶颈往往出现在数据库的数据交互上提高服务器本身的这点性能尤其是Buffer缓冲区模块的这一点点性能提升对整个程序本身的优化是微不足道的因为从数据库中读取一次数据往往就抵消了Buffer模块所做的所有低等级优化这时还不如把精力放在DB调优上。
所以muduo库的Buffer模块可以优化但优化的意义往往不大因为这些性能提升往往是微不足道的。如果确实在内存带宽方面遇到问题或者确实有性能的需要可以考虑将Buffer模块改写为zero copy方案甚至可以考虑将程序放到Linux内核中去而不是在用户态尝试各种优化。因为只有把程序做到操作系统内核才能真正实现zero copy否则内核态和用户态之间始终有一次内存拷贝。
二、Socket模块
1.Socket模块的设计
Socket模块是封装了socket套接字操作也就是将Linux的那一套TCP socket套接字操作封装成一个Socket类这个类比较简单需要的功能有以下几个
创建套接字即封装socket创建套接字函数操作。绑定地址信息即封装bind函数操作。设置开始监听即封装listen函数操作。设置客户端发起连接请求即封装connect函数操作。获取新连接即封装accept函数操作。接收数据即封装recv函数操作。发送数据即封装send函数操作。最后提供创建服务器连接接口和创建客户端连接接口。
2.Socket代码实现
#define MAX_LISTEN 1024class Socket
{
public:Socket(): _sockFd(-1){}Socket(int fd): _sockFd(fd){}~Socket(){closeSocket();}// 创建套接字bool createSocket(){_sockFd socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);if (_sockFd 0){LOG(create socket error);return false;}return true;}// 绑定地址信息bool bindSocket(const std::string ip, uint16_t port){sockaddr_in addr;addr.sin_family AF_INET;addr.sin_port htons(port);addr.sin_addr.s_addr ip.size() 0 ? INADDR_ANY : inet_addr(ip.c_str());socklen_t len sizeof(sockaddr_in);int bindRes bind(_sockFd, (sockaddr *)addr, len);if (bindRes 0){LOG(bind error);return false;}return true;}// 开始监听bool listenSocket(int backlog MAX_LISTEN){int listenRes listen(_sockFd, backlog);if (listenRes 0){LOG(listen error);return false;}return true;}// 向服务器发起连接// 这个接口是给客户端向服务端发起连接请求的bool connectSocket(const std::string ip, uint16_t port){sockaddr_in addr;addr.sin_family AF_INET;addr.sin_port htons(port);addr.sin_addr.s_addr inet_addr(ip.c_str());socklen_t len sizeof(sockaddr_in);int connectRes connect(_sockFd, (sockaddr *)addr, len);if (connectRes 0){LOG(connect error);return false;}return true;}// 获取新连接int acceptConnect(){int newFd accept(_sockFd, nullptr, nullptr);if (newFd 0){LOG(accept error);return -1;}return newFd;}// 接收数据ssize_t recvData(void *buf, size_t len, int flag 0){ssize_t recvRes recv(_sockFd, buf, len, flag);if (recvRes 0){if (errno EAGAIN || errno EINTR){return 0;}LOG(recv error);return -1;}return recvRes;}ssize_t nonBlockRecv(void *buf, size_t len){return recvData(buf, len, MSG_DONTWAIT);}// 发送数据ssize_t sendData(const void *buf, size_t len, int flag 0){ssize_t sendRes send(_sockFd, buf, len, flag);if (sendRes 0){if (errno EAGAIN || errno EINTR){return 0;}LOG(send error);return -1;}return sendRes;}ssize_t nonBlockSend(void *buf, size_t len){if (len 0){return 0;}return sendData(buf, len, MSG_DONTWAIT);}// 关闭套接字void closeSocket(){if (_sockFd ! -1){close(_sockFd);_sockFd -1;}}// 创建一个服务端连接// 这个函数接口是服务端调用的用来创建一个服务端连接bool createServer(uint16_t port, const std::string ip , bool blockFlag false){if (createSocket() false){return false;}if (bindSocket(ip, port) false){return false;}if (listenSocket() false){return false;}if (blockFlag){nonBlock();}reuseAddress();return true;}// 创建一个客户端连接bool createClient(uint16_t port, const std::string ip){if (createSocket() false){return false;}if (connectSocket(ip, port) false){return false;}return true;}// 设置套接字选项---开启地址端口重用void reuseAddress(){int val 1;setsockopt(_sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)val, sizeof(int));val 1;setsockopt(_sockFd, SOL_SOCKET, SO_REUSEPORT, (void *)val, sizeof(int));}// 设置套接字阻塞属性---设置为非阻塞void nonBlock(){int flag fcntl(_sockFd, F_GETFL, 0);fcntl(_sockFd, F_SETFL, flag | O_NONBLOCK);}int getSocketFd(){return _sockFd;}private:// socket创建的套接字int _sockFd;
};三、Acceptor模块
1.Acceptor模块的设计与实现
Acceptor模块就是主Reactor获取新连接的模块它与主Reactor关联由最顶层的TcpServer模块将它与EventLoop模块关联起来这个模块比较简单我们可以通过设计与实现来了解该模块的原理以及功能。首先看一下Acceptor的类内成员变量
_socket该变量用于创建服务器我们上面已经封装了Socket类所以可以通过_socket这个变量快速地调用Socket类接口通过socket套接字操作搭建服务器。_eventLoop该变量就是主Reactor只负责监控新连接的到来。_channel该变量是主Reactor的事件管理类变量。_acceptCallBack该变量是新连接到来以后执行的回调函数这个回调函数由外部设置Acceptor类负责调用。
private:Socket _socket;// 用于创建监听套接字EventLoop *_eventLoop;// 用于对监听套接字进行事件监控Channel _channel;// 用于对监听套接字进行事件管理AcceptCallBack _acceptCallBack;接下来是Acceptor类的构造函数构造函数初始化主Reactor以及创建服务器同时设置主Reactor可读事件触发的回调函数。 // 不能将启动读事件监控放到构造函数中必须在设置回调函数之后再去启动// 否则有可能造成启动监控后立即有事件到来了但是处理的时候回调函数还没设置所以新连接得不到处理并且资源泄露Acceptor(EventLoop *eventLoop, uint16_t port):_eventLoop(eventLoop), _socket(createServer(port)), _channel(eventLoop, _socket.getSocketFd()){_channel.setReadAbleCallBack(std::bind(Acceptor::handleRead, this));}构造函数里设置的主Reactor可读事件触发回调函数是handleRead函数这个函数也是Acceptor类内成员函数该函数首先是调用accept函数操作获取新连接。因为TCP服务器的套接字操作步骤是socket创建套接字、bind绑定地址信息、listen设置监听状态最后accept获取连接。当有连接到来的时候进程会从accept函数调用处返回返回以后handleRead函数再去调用外部设置的新连接到来触发的回调函数即调用成员变量_acceptCallBack。 // 监听套接字的读事件回调函数---获取新连接调用_acceptCallBack函数进行新连接处理void handleRead(){int newFd _socket.acceptConnect();if(newFd 0){return;}if(_acceptCallBack){_acceptCallBack(newFd);}}事实上这个_acceptCallBack是由TcpServer模块设置的这一整条链路的逻辑是这样的Acceptor构造函数初始化_socket时调用createServer函数创建服务器createServer函数只会执行socket创建套接字、bind绑定地址信息、listen设置监听状态的操作并不会调用accept获取连接因为accept函数如果没有连接到来的话是会让线程或进程阻塞的。Acceptor构造函数设置的主Reactor的可读事件回调函数是handleRead这个handleRead函数只会在可读事件触发时被调用主Reactor的可读事件只会在新连接到来时触发。所以当有新连接到来时主Reactor的可读事件触发调用handleRead函数执行。
我们再来看看handleRead函数内部执行了什么首先是调用accept函数获取连接也就是说这种写法不用进程或线程阻塞在accept函数调用处等待连接的到来而是当连接到来的时候以可读事件触发的形式调用handleRead函数去执行accept获取连接这一次获取连接是不需要等待就能获取成功的所以接下来可以执行外部设置的连接到来回调函数_acceptCallBack。
这样又回到上面说的这个_acceptCallBack是由TcpServer模块设置的设置的函数就是newConnection这个函数后续会介绍它的功能就是为新连接创建Connection对象去管理。至此主Reactor如何获取新连接以及Acceptor的原理和功能我们都清楚了。这个模块涉及了一些回调函数并且还与其它多个模块关联如果不仔细梳理很难彻底弄清楚Acceptor模块到底是做什么的也很难弄清楚该模块与其它模块的关系。
2.Acceptor模块完整代码实现
介绍完了关键几个代码之后剩下的函数接口都比较简单这里就不过多讲解了贴上代码即可。
using AcceptCallBack std::functionvoid(int);class Acceptor
{
public:// 不能将启动读事件监控放到构造函数中必须在设置回调函数之后再去启动// 否则有可能造成启动监控后立即有事件到来了但是处理的时候回调函数还没设置所以新连接得不到处理并且资源泄露Acceptor(EventLoop *eventLoop, uint16_t port):_eventLoop(eventLoop), _socket(createServer(port)), _channel(eventLoop, _socket.getSocketFd()){_channel.setReadAbleCallBack(std::bind(Acceptor::handleRead, this));}void setAcceptCallBack(const AcceptCallBack callBack){_acceptCallBack callBack;}void startListen(){_channel.startReadAbleEvent();}private:// 监听套接字的读事件回调函数---获取新连接调用_acceptCallBack函数进行新连接处理void handleRead(){int newFd _socket.acceptConnect();if(newFd 0){return;}if(_acceptCallBack){_acceptCallBack(newFd);}}int createServer(uint16_t port){bool ret _socket.createServer(port);assert(ret true);return _socket.getSocketFd();}
private:Socket _socket;// 用于创建监听套接字EventLoop *_eventLoop;// 用于对监听套接字进行事件监控Channel _channel;// 用于对监听套接字进行事件管理AcceptCallBack _acceptCallBack;
};四、定时器模块
1.时间轮的思想
为了防止一个连接长时间不与服务器进行信息交互但又霸占着连接资源所以我们的服务器必须要有定时器模块。因为服务器的资源是有限的最简单的来说每个连接就是一个socket其实本质上是文件描述符而Linux操作系统中文件描述符是有限资源如果被大量这种不通信又不断开的连接霸占那么后面到来的新连接就没办法获取文件描述符了。所以定时器模块可以让网络库的使用者设定一个超时时间当连接距离上一次通信的时间到当前时刻的时间间隔大于超时时间服务器会主动断开连接。
实现定时器模块首先需要有计时的功能muduo库使用的计时器功能是Linux操作系统提供的timerfd这是以文件描述符方式管理超时提醒的机制。timerfd_fd函数能创建一个系统的计时器我们设置计时时间操作系统会给我们返回一个文件描述符当设置的时间到了操作系统会向文件里写入数据数据表示距离上一次读取超时了多少次。有了这个机制我们可以很好地将计时功能与epoll多路转接联系起来使用。我们可以将文件描述符放在epoll模型中监控可读事件当可读事件触发时说明操作系统向该文件写入数据了也就是说明时间到了这时候再由epoll_wait函数返回可读事件经由上层用户去处理超时连接。这里就不介绍timerfd是具体使用了只需要知道是用来做什么的就可以不了解如何使用的可以去学习一下接口。
但是如果只使用timerfd显然无法满足我们的需求试想一下仅仅使用timerfd来计时的话最好的方式就是全局定义一个计时器然后每个连接记录一个距离上次通信的时间检查超时销毁连接的操作就变成了需要遍历所有的连接如果超时了再将其销毁。这样的做法针对服务器中大量连接的情况是不实际的。所以需要思考的就是如何高效地组织这些定时器能够快速地找出当前时刻已经超时的连接。muduo的做法是使用set数据结构也就是底层使用的红黑树把每个连接按到期时间先后排序操作的时间复杂度是O(logN)。但除此之外作者在书中还介绍了时间轮的方案虽然作者没有实现在muduo库源码上但通过作者描述的思路我更感兴趣这一种实现方案下面我将详细介绍时间轮方案的实现。
其实我翻阅了一些书籍以及搜索了网上的一些文章定时器的实现方案还是有许多的比如按到期时间先后排序的队列、以到期时间建立最小堆、红黑树、时间轮。这些都是比较常见的实现方案对比了一下其实会发现时间轮方案比其它都更优越。
我在项目中实现的时间轮是多级时间轮不过我的多级时间轮只有两级因为多了没必要秒级时间轮和分钟级时间轮就已经能大致满足需求了毕竟一个连接多长时间未通信销毁其实没有固定值一般也是根据场景来确定。几秒钟几十秒就关闭连接太短了几个小时才关闭连接似乎又太长了所以分钟级别是比较合适的。
秒级时间轮和分钟级时间轮其实是一样的数据结构它们都是有一个vector数组这个数组有60个元素分别代表60秒和60分钟。vector数组每一个元素下面还跟着一个数组这个数组就是超时任务对象。具体的运作逻辑是秒针每秒向后移动一格然后将当前指向的所有超时任务销毁这就是时间轮的基本思想。由于秒针是每秒向后移动一格然后秒针走到哪里就执行哪里的超时任务所以一维的vector数组应该被我们设计成循环队列这一点也不难理解因为限制了长度是60循环队列才能让秒针一直在转就像时钟一样不停地转圈圈。
举个例子更好地说明一下假设初始状态秒针和分针都指向0下标处此时有一个5秒之后的超时任务所以在秒针当前位置向后加5的位置插入这个超时任务。由于秒针是每秒向前走一格的走到5位置处刚好是5秒也就意味着改任务的时间到了可以执行超时处理了。 上图虽然只有秒级时间轮但加入分钟级时间轮也很好理解这其实和现实生活中的时钟是一样的道理秒级时间轮的指针走了60格以后分钟级时间轮的指针才能向前走1格就好比现实中秒针走一圈分针才向前走一格。这就是时间轮的思想理解起来应该不困难接下来介绍时间轮定时器模块的具体设计与实现。
2.TimerTask类
首先我们需要有一个超时任务类因为时间轮管理的是一个个的超时任务对象它只负责如何高效地管理这些对象具体如何执行超时以后的操作还是通过超时任务对象来确定的。所以TimerTask类就是设置超时任务类。它可以设置超时的时间是多少以及超时以后该如何处理。超时以后如何处理是上层决定的事情定时器模块需要的只是一个回调函数超时以后调用该回调函数即可。
除此之外我们不希望时间轮的指针走到超时位置时还要一个一个TimerTask对象去执行超时对应的回调函数我们希望这些TimerTask对象自己调用这些回调。所以我们采用RAII的思想让TimerTask类在构造函数里完成这些超时时间、超时回调函数的设置在析构函数里调用超时回调函数。这样就只需要在时间轮里释放TimerTask对象即可它会在析构函数里自动调用超时回调函数非常方便。
TimerTask的代码不难实现这里就直接给出了具体是看TimerTask对象如何在时间轮里被使用。
/// brief 定时器任务类这个类实例化出来的对象在生命周期内就是一个定时任务当生命周期结束的时候就代表超时了就会执行超时的任务
/// 这里其实使用了RAII的思想
class TimerTask
{
public:// 构造函数需要告诉我定时器任务对象的id是什么超时时间是什么以及超时以后处理的任务是什么TimerTask(uint64_t id, uint32_t timeout, const TaskFunc task): _id(id),_timeout(timeout),_task(task),_cancelFlag(false){}// 析构函数在析构的时候执行超时任务~TimerTask(){// 如果上层用户没有取消超时任务就执行超时任务if (_cancelFlag false){_task();}// 调用release回调函数_release();}void setRelease(const ReleaseFunc release){_release release;}uint32_t getTimeOut(){return _timeout;}void cancel(){_cancelFlag true;}private:uint64_t _id; // 定时器任务对象的IDuint32_t _timeout; // 定时任务的超时时间bool _cancelFlag; // false表示没有被取消true表示被取消TaskFunc _task; // 定时器对象要执行的定时任务ReleaseFunc _release; // 用于删除TimerWheel中保存的定时器对象信息
};3.TimerWheel类
TimerWheel类就是我们要实现的时间轮首先看一下TimerWheel类成员变量的设计
private:int _second_tick; // 当前的秒针走到哪里释放哪里就相当于执行哪里的任务int _minute_tick; // 当前的分针int _capacity; // 时间轮表盘最大数量其实就是最大延迟时间// 这是个二维数组其实就是一个桶结构一维的每一个元素代表每1s的时间// 每秒下面挂的就是这一时刻的定时器任务对象的shared_ptrstd::vectorstd::vectorTaskSharedPtr _second_wheel; // 秒级时间轮std::vectorstd::vectorTaskSharedPtr _minute_wheel; // 分钟级时间轮// _timers是用来保存定时器任务对象的weak_ptr的key值是定时器任务对象的ID值用来索引对应的定时器任务对象// value值是保存定时器任务对象的weak_ptr// 这里用weak_ptr的原因是不会使shared_ptr的引用计数增加// 如果使用shared_ptr的话在插入到_timers中或者从_timers中获取对象的时候都会拷贝shared_ptr对象使得引用计数增加这样是不合理的std::unordered_mapuint64_t, TaskWeakPtr _timers;EventLoop *_eventLoop;int _timerfd; // 定时器描述符// 定时器任务对象的时间管理对象指针std::unique_ptrChannel _timerChannel;成员变量中首先需要的是两个时间轮分别是秒级时间轮和分钟级时间轮以及两个时间轮的指针分别代表秒针和分针。还需要一个_timers哈希表用来保存所有的定时任务TimerTask对象这也很有必要因为我们需要将TimerTask对象管理起来以后需要对其进行查找使用哈希表效率比较高。
重点的是我们这里时间轮管理的定时任务TimerTask对象并不是使用原生的对象指针而是使用智能指针我认为这是设计的一个巧妙之处这里用智能指针主要有以下场景的考虑
当我们需要刷新定时任务的时间时比如说在2s的时候一个新连接到来了服务器为这个连接建立了一个定时任务5s之后如果不通信的话就销毁连接也就是应该在第7s销毁连接那么这个定时任务就应该添加在秒级时间轮下标为7的位置。但如果当秒针走到3s处这个连接有一次新的通信我们就应该刷新定时任务的销毁时间就不应该是第7s销毁了因为最近一次通信是第3s按照5s之后不活跃销毁的规则真正销毁的时间应该是第8s。这时如果不使用智能指针而是使用原生的对象指针是会出问题的因为第7s和第8s处都有这个定时任务指针会被释放两次这是会出错误的。所以使用shared_ptr智能指针在这个地方非常有必要我们可以在第8s新插入这个定时任务对象的shared_ptr即使秒针走到第7s释放了原来的shared_ptr由于引用计数不为0所以不会真正释放对象等到第8s的时候才会真正释放这就是使用shared_ptr智能指针的原因。
接下来再介绍多层级的时间轮是如何实现的首先定时任务TimerTask对象会设置timeout超时时间如果这个超时时间大于或等于60说明超时时间在一分钟以上那就需要使用分钟级时间轮和秒级时间轮了。否则的话秒级时间轮就够用了。如果要用分钟级时间轮假设超时时间timeout为110s即1min50s那么就在分针级时间轮的第1格插入这个TimerTask对象。当秒针走过了60格以后分针就会向前走1格此时并不释放分针级时间轮第一个的这个TimerTask对象而是将该定时任务对象转移到秒级时间轮对应的格子中比如这个110s的TimerTask对象就应该转移到秒级时间轮的第50格。当下一次秒针再走到第50格的时候前后就一共走了6050110s就可以释放这个TimerTask对象了。也就是说分钟级时间轮不处理定时任务只将定时任务转移到秒级时间轮只有秒级时间轮才会处理超时任务。 接下来我们可以介绍addTimer、refreshTimer和runTimerTask三个函数接口了。
addTimer 首先是addTimer函数接口这个接口是添加一个定时任务TimerTask对象到时间轮中由于我们有秒级时间轮和分钟级时间轮所以需要对TimerTask对象的超时时间timeout进行判断如果timeout大于或等于60即超过一分钟以上那就需要将这个TimerTask对象放在分钟级时间轮上。否则的话就放在秒级时间轮上。该函数的具体实现如下 // 添加定时任务到EventLoop中void addTimerToEventLoop(uint64_t id, uint32_t timeout, const TaskFunc task){// 首先new一个定时器任务对象出来交给shared_ptr管理TaskSharedPtr ptr(new TimerTask(id, timeout, task));// 设置定时器任务对象的release回调函数设置为removeTimer函数// 这个release函数是用来清理TimerWheel中保存的定时器任务对象信息的ptr-setRelease(std::bind(TimerWheel::removeTimer, this, id));// 设置完以后计算超时时间如果timeout60说明要用到分钟轮if (timeout 60){int minute_timeout timeout / 60;int minute_pos (_minute_tick minute_timeout) % _capacity;_minute_wheel[minute_pos].push_back(ptr);}else{int second_pos (_second_tick timeout) % _capacity;_second_wheel[second_pos].push_back(ptr);}// 构造一个weak_ptr到_timers中保存定时器任务对象的信息_timers[id] TaskWeakPtr(ptr);}refreshTimer refreshTimer是刷新定时任务的函数接口也就是上面提到的使用智能指针在这种场景下的好处。实现这个接口也不难只需要在当前指针加上timeout的位置处插入一个管理TimerTask对象的shared_ptr智能指针就可以实现刷新操作。 // 刷新或者延迟定时任务bool refreshTimerInEventLoop(uint64_t id){// 通过保存的定时器对象的weak_ptr构造一个shared_ptr出来添加到时间轮中// 首先通过定时器任务ID到_timers中查找对应的定时器任务对象的weak_ptrauto iter _timers.find(id);if (iter _timers.end()){return false;}// 找到了以后将对应的weak_ptr转换成shared_ptrTaskSharedPtr ptr iter-second.lock();// 获取这个定时器任务对象的timeout超时时间int timeout ptr-getTimeOut();if (timeout 60){int minute_timeout timeout / 60;int minute_pos (_minute_tick minute_timeout) % _capacity;_minute_wheel[minute_pos].push_back(ptr);}else{int second_pos (_second_tick timeout) % _capacity;_second_wheel[second_pos].push_back(ptr);}return true;}runTimerTask runTimerTask函数接口其实就是让秒针每秒向前走一格如果秒针已经走了一圈了就让分针向前走一格。因此这个函数必须每秒钟被调用一次具体如何保证每秒钟被调用一次我们下面再介绍这里先介绍该函数接口的实现。其实实现也是非常简单就是每次调用都让秒针向后移动一格然后释放秒级时间轮秒针位置的所有TimerTask对象这样就会执行这个TimerTask对象的析构函数从而去执行超时处理。如果秒针走了一圈了那就让分针向前走一格然后将分钟级时间轮中分针指向位置的TimerTask对象转移到秒级时间轮的对应位置。分钟级时间轮不处理超时任务只将超时任务转移给秒级时间轮去处理。 void runTimerTask(){// 首先判断秒针是否已经走满一圈是的话就让分针进一格if (_second_tick 1 60){_minute_tick (_minute_tick 1) % _capacity;for(int i 0; i _minute_wheel[_minute_tick].size(); i){int timeout _minute_wheel[_minute_tick][i]-getTimeOut();// 分钟级时间轮的时间到了将定时任务转移到秒级时间轮对应的位置_second_wheel[timeout % 60].push_back(_minute_wheel[_minute_tick][i]);}_minute_wheel[_minute_tick].clear();}// 每一秒更新一次_tick的位置相当于每一秒时钟向后走一步_second_tick (_second_tick 1) % _capacity;// 更新完以后就销毁该时刻下对应的所有定时器任务_second_wheel[_second_tick].clear();}到这里我们就基本上实现了时间轮的所有核心操作但是现在的时间轮只有操作和管理定时任务TimerTask对象的功能定时器模块还需要计时功能没有计时功能怎么知道有没有超时呢所以接下来我们需要实现定时器模块的计时功能。
我们之前也提过muduo库的计时功能是通过Linux提供的timerfd机制实现的操作系统会为我们创建一个文件超时了就向文件中写入数据。所以我们可以将该文件的文件描述符用epoll监控起来一旦可读事件触发说明文件中有数据到来也就是说提醒我们时间到了。所以首先需要创建一个timerfd并且我们需要将超时时间设置为1s因为我们希望操作系统每秒提醒我们调用一次runTimerTask函数这样就能让时间轮的秒针每秒向前移动一格。createTimerFd函数实现如下 static int createTimerFd(){// 使用timerfd_create函数会创建一个定时器的文件描述符// 这个文件描述符是操作系统帮我们管理的一旦超时操作系统就会向文件里写入数据// 每次读取出来的数据表示距离上一次读取超时了多少次int timerfd timerfd_create(CLOCK_MONOTONIC, 0);if (timerfd 0){LOG(timerfd_create error);abort();}// 这里将超时时间设置为1s每1s后超时一次struct itimerspec itime;itime.it_value.tv_sec 1;itime.it_value.tv_nsec 0; // 第一次超时时间为1s后itime.it_interval.tv_sec 1;itime.it_interval.tv_nsec 0; // 第一次超时后每次超时的时间间隔timerfd_settime(timerfd, 0, itime, nullptr);return timerfd;}我们在TimerWhell时间轮类的构造函数处就应该调用createTimerFd去创建timerfd并且将该文件描述符交给epoll去监控设置可读事件触发的回调函数为timeout这个函数接下来我们会实现然后启动epoll的可读事件监控这样就能监控timerfd文件描述符了由于我们设置的timerfd超时时间是1s所以操作系统每1s都会向文件描述符写入数据epoll每1s都会监控到可读事件然后调用timeout函数。构造函数的实现如下 // 构造函数需要传递进入eventLoop对象// 因为定时器任务也是需要被reactor管理的// 添加定时器任务、刷新定时器任务、取消定时器任务// 这些都会作为事件让eventLoop对象管理eventLoop对象再交给Poller对象去监控这些事件// 监控事件发生以后再让Channel对象去处理TimerWheel(EventLoop *eventLoop): _capacity(60), _second_tick(0), _minute_tick(0), _second_wheel(_capacity), _minute_wheel(_capacity),_eventLoop(eventLoop), _timerfd(createTimerFd()),_timerChannel(new Channel(_eventLoop, _timerfd)){// _timerChannel是定时器任务事件管理的对象用来管理定时器任务的事件// 这里设置_timerChannel的可读事件设置为timeOut一旦可读事件就绪就会调用timeout_timerChannel-setReadAbleCallBack(std::bind(TimerWheel::timeOut, this));// 开始可读事件的监控_timerChannel-startReadAbleEvent();}接下来是timeout函数这个函数是被绑定在timerfd文件描述符的可读事件下的timerfd的可读事件触发就会被调用所以这个函数每秒钟会被调用一次。所以我们要利用它每秒钟被调用一次的特点通过该函数去调用runTimerTask函数让秒针每秒向后走一格这样整个定时器模块的逻辑就打通了。 // 这个函数已经被绑定在_timerChannel对象中也就是说被作为可读事件监控起来了// 一旦可读事件触发就会调用这个函数// 那什么时候会触发呢当这个事件关联的文件描述符也就是_timerfd有数据到来的时候// 因为_timerfd是操作系统为我们创建的一个定时器文件是操作系统在帮我们管理// 并且我们设置了每1s超时一次所以操作系统每1s都会提醒我们一次// 也就是说每1s操作系统都会向_timerfd中写入数据那么每1s可读事件都会就绪就会调用timeout函数// 调用以后就会去读取_timerfd里的内容并且执行runTimerTask函数去处理超时任务// 这里就是定时器的精髓很好地利用操作系统的timerfd机制将文件描述符交给epoll去监控管理// 一旦超时了操作系统会通知非常牛逼void timeOut(){// 先调用readTimerFd函数去读取timerfd里的数据// times表示距离上一次读取超时了多少次int times readTimerFd();// 循环执行runTimerTask// 超时了多少次就执行多少次runTimerTask函数是将指时钟向后移动1s然后执行对应时刻的所有超时任务for (int i 0; i times; i){runTimerTask();}}至此TimerWhell时间轮类的实现基本差不多了剩下的接口比较简单就不过多介绍了这里可以直接给完整的代码
using TaskWeakPtr std::weak_ptrTimerTask;
using TaskSharedPtr std::shared_ptrTimerTask;/// brief 时间轮类用来管理定时任务对象的类实现超时处理的方式是时间轮
class TimerWheel
{
public:// 构造函数需要传递进入eventLoop对象// 因为定时器任务也是需要被reactor管理的// 添加定时器任务、刷新定时器任务、取消定时器任务// 这些都会作为事件让eventLoop对象管理eventLoop对象再交给Poller对象去监控这些事件// 监控事件发生以后再让Channel对象去处理TimerWheel(EventLoop *eventLoop): _capacity(60), _second_tick(0), _minute_tick(0), _second_wheel(_capacity), _minute_wheel(_capacity),_eventLoop(eventLoop), _timerfd(createTimerFd()),_timerChannel(new Channel(_eventLoop, _timerfd)){// _timerChannel是定时器任务事件管理的对象用来管理定时器任务的事件// 这里设置_timerChannel的可读事件设置为timeOut一旦可读事件就绪就会调用timeout_timerChannel-setReadAbleCallBack(std::bind(TimerWheel::timeOut, this));// 开始可读事件的监控_timerChannel-startReadAbleEvent();}~TimerWheel(){}// 把所有对定时任务的操作都放在一个线程中完成所以放在EventLoop线程中比较合适// 因为定时器中有个_timers成员定时器信息的操作有可能在多线程中进行因此需要考虑线程安全问题// 但是又不想加锁因为加锁耗费资源所以把所有定时操作放在一个线程中进行// 这里和EventLoop模块、线程池模块的设计解决了这种线程不安全的问题// 因为设计的时候一个EventLoop对象就会被分配一个线程并且EventLoop对象内部会绑定这个线程的ID// 这些函数将来都会作为任务通过EventLoop对象的runInLoop函数添加到任务队列// 在这里就会判断当前执行的线程是不是EventLoop对象对应的线程// 如果是的话就直接执行任务如果不是的话就加入任务队列等到下一次被EventLoop对象对应的线程调用// 这样就能保证这些addTimer类似的函数能够只被一个线程调用不会被其它线程调用// 所以临界资源_timers是安全的就不会有线程安全了void addTimer(uint64_t id, uint32_t timeout, const TaskFunc task);void refreshTimer(uint64_t id);void cancelTimer(uint64_t id);// 存在线程安全问题所以只能在EventLoop线程调用不能被其它线程调用bool hasTimer(uint64_t id);private:// 刷新或者延迟定时任务bool refreshTimerInEventLoop(uint64_t id){// 通过保存的定时器对象的weak_ptr构造一个shared_ptr出来添加到时间轮中// 首先通过定时器任务ID到_timers中查找对应的定时器任务对象的weak_ptrauto iter _timers.find(id);if (iter _timers.end()){return false;}// 找到了以后将对应的weak_ptr转换成shared_ptrTaskSharedPtr ptr iter-second.lock();// 获取这个定时器任务对象的timeout超时时间int timeout ptr-getTimeOut();if (timeout 60){int minute_timeout timeout / 60;int minute_pos (_minute_tick minute_timeout) % _capacity;_minute_wheel[minute_pos].push_back(ptr);}else{int second_pos (_second_tick timeout) % _capacity;_second_wheel[second_pos].push_back(ptr);}return true;}// 添加定时任务到EventLoop中void addTimerToEventLoop(uint64_t id, uint32_t timeout, const TaskFunc task){// 首先new一个定时器任务对象出来交给shared_ptr管理TaskSharedPtr ptr(new TimerTask(id, timeout, task));// 设置定时器任务对象的release回调函数设置为removeTimer函数// 这个release函数是用来清理TimerWheel中保存的定时器任务对象信息的ptr-setRelease(std::bind(TimerWheel::removeTimer, this, id));// 设置完以后计算超时时间如果timeout60说明要用到分钟轮if (timeout 60){int minute_timeout timeout / 60;int minute_pos (_minute_tick minute_timeout) % _capacity;_minute_wheel[minute_pos].push_back(ptr);}else{int second_pos (_second_tick timeout) % _capacity;_second_wheel[second_pos].push_back(ptr);}// 构造一个weak_ptr到_timers中保存定时器任务对象的信息_timers[id] TaskWeakPtr(ptr);}// 执行定时任务// 这个函数应该每秒钟被执行一次相当于秒针向后走了一步// 这个函数会被timeout函数调用timeout函数会在超时的时候被调用// 而TimerFd我们设置的是1s钟超时所以这里的逻辑就是每隔1s操作系统会向timerfd中写入数据代表时间到了// 然后epoll监控到timerfd文件描述符的可读事件触发就调用对应的可读事件回调函数也就是timeout函数// timeout函数再调用runTimerTask函数这样就能保证一秒钟执行这个函数一次void runTimerTask(){// 首先判断秒针是否已经走满一圈是的话就让分针进一格if (_second_tick 1 60){_minute_tick (_minute_tick 1) % _capacity;for(int i 0; i _minute_wheel[_minute_tick].size(); i){int timeout _minute_wheel[_minute_tick][i]-getTimeOut();// 分钟级时间轮的时间到了将定时任务转移到秒级时间轮对应的位置_second_wheel[timeout % 60].push_back(_minute_wheel[_minute_tick][i]);}_minute_wheel[_minute_tick].clear();}// 每一秒更新一次_tick的位置相当于每一秒时钟向后走一步_second_tick (_second_tick 1) % _capacity;// 更新完以后就销毁该时刻下对应的所有定时器任务_second_wheel[_second_tick].clear();}// 删除保存的定时器任务对象的信息也就是在_timers中的内容void removeTimer(uint64_t id){auto iter _timers.find(id);if (iter ! _timers.end()){_timers.erase(iter);}}// 取消定时器任务void cancelTimerInEventLoop(uint64_t id){// 首先看该定时器任务是否在_timers中// 如果不在的话说明没有这个定时器任务直接返回auto iter _timers.find(id);if (iter _timers.end()){return;}// 找到了以后构造shared_ptr指针对象然后去执行定时器任务对象的cancel函数// 这个函数就是将取消标志位设置为true这样在定时器任务对象析构的时候就不会去执行定时器对象要处理的任务// 只会执行release回调函数也就是删除在_timers中保存的定时器对象的信息TaskSharedPtr ptr iter-second.lock();if (ptr){ptr-cancel();}}// 创建Timerfdstatic int createTimerFd(){// 使用timerfd_create函数会创建一个定时器的文件描述符// 这个文件描述符是操作系统帮我们管理的一旦超时操作系统就会向文件里写入数据// 每次读取出来的数据表示距离上一次读取超时了多少次int timerfd timerfd_create(CLOCK_MONOTONIC, 0);if (timerfd 0){LOG(timerfd_create error);abort();}// 这里将超时时间设置为1s每1s后超时一次struct itimerspec itime;itime.it_value.tv_sec 1;itime.it_value.tv_nsec 0; // 第一次超时时间为1s后itime.it_interval.tv_sec 1;itime.it_interval.tv_nsec 0; // 第一次超时后每次超时的时间间隔timerfd_settime(timerfd, 0, itime, nullptr);return timerfd;}// 读取timerfd里的数据int readTimerFd(){// 这个times就是读取出来的内容表示距离上一次读取超时了多少次uint64_t times;int readRes read(_timerfd, times, 8);if (readRes 0){LOG(read error);abort();}return times;}// 这个函数已经被绑定在_timerChannel对象中也就是说被作为可读事件监控起来了// 一旦可读事件触发就会调用这个函数// 那什么时候会触发呢当这个事件关联的文件描述符也就是_timerfd有数据到来的时候// 因为_timerfd是操作系统为我们创建的一个定时器文件是操作系统在帮我们管理// 并且我们设置了每1s超时一次所以操作系统每1s都会提醒我们一次// 也就是说每1s操作系统都会向_timerfd中写入数据那么每1s可读事件都会就绪就会调用timeout函数// 调用以后就会去读取_timerfd里的内容并且执行runTimerTask函数去处理超时任务// 这里就是定时器的精髓很好地利用操作系统的timerfd机制将文件描述符交给epoll去监控管理// 一旦超时了操作系统会通知做到了异步的事件驱动机制非常牛逼void timeOut(){// 先调用readTimerFd函数去读取timerfd里的数据// times表示距离上一次读取超时了多少次int times readTimerFd();// 循环执行runTimerTask// 超时了多少次就执行多少次runTimerTask函数是将指时钟向后移动1s然后执行对应时刻的所有超时任务for (int i 0; i times; i){runTimerTask();}}private:int _second_tick; // 当前的秒针走到哪里释放哪里就相当于执行哪里的任务int _minute_tick; // 当前的分针int _capacity; // 时间轮表盘最大数量其实就是最大延迟时间// 这是个二维数组其实就是一个桶结构一维的每一个元素代表每1s的时间// 每秒下面挂的就是这一时刻的定时器任务对象的shared_ptrstd::vectorstd::vectorTaskSharedPtr _second_wheel; // 秒级时间轮std::vectorstd::vectorTaskSharedPtr _minute_wheel; // 分钟级时间轮// _timers是用来保存定时器任务对象的weak_ptr的key值是定时器任务对象的ID值用来索引对应的定时器任务对象// value值是保存定时器任务对象的weak_ptr// 这里用weak_ptr的原因是不会使shared_ptr的引用计数增加// 如果使用shared_ptr的话在插入到_timers中或者从_timers中获取对象的时候都会拷贝shared_ptr对象使得引用计数增加这样是不合理的std::unordered_mapuint64_t, TaskWeakPtr _timers;EventLoop *_eventLoop;int _timerfd; // 定时器描述符// 定时器任务对象的时间管理对象指针std::unique_ptrChannel _timerChannel;
};五、线程池模块
由于我们实现的是多线程模型所以必须要实现一个线程池我们要实现one loop one thread服务器主Reactor运行在主线程上只负责接收获取连接从属Reactor运行在子线程上负责处理连接的IO事件所以我们可能会创建多个线程来运行从属Reactor就需要线程池来管理这些线程。
1.LoopThread类
LoopThread类实现的是单个线程的逻辑我们希望创建单个线程对象即LoopThread对象时也意味着创建一个从属Reactor。也就是说一个子线程与一个从属Reactor绑定在一起。我们绑定线程的执行函数为threadEntry。在threadEntry这个函数内部我们创建从属Reactor然后让该从属Reactor启动监控这个启动监控其实是一个while(true)死循环也就是它会一直启动。threadEntry函数的实现如下 // 实例化EventLoop对象唤醒_cond上有可能阻塞的线程并且开始运行EventLoop模块的功能void threadEntry(){// 这个函数是每次被线程执行的函数// 每次执行都会新建一个EventLoop对象创建好之后调用start启动EventLoop loop;{std::unique_lockstd::mutex lock(_mutex);_eventLoop loop;_cond.notify_all();}loop.start();// 出了这个函数EventLoop会自动销毁// 但实际上start是一个死循环除非关闭连接或者连接出错了// 就会去调用连接关闭回调函数和错误处理回调函数来关闭连接这样才会退出// 所以也就是说这个创建的loop是贯穿一个连接整个生命周期的// 只有连接结束了这个loop才会被销毁}这样单个线程的任务就完成了LoopThread类的完整代码实现如下
class LoopThread
{
public:// 创建线程设定线程入口函数// 这里创建线程的时候将线程_thread初始化函数设置为threadEntry函数// 也就是说创建的这个线程每次会去执行threadEntry函数LoopThread():_eventLoop(nullptr), _thread(std::thread(LoopThread::threadEntry, this)){}// 返回当前线程关联的EventLoop对象指针EventLoop *getEventLoop(){EventLoop *loop nullptr;{// 这里必须加锁等待条件变量_eventLoop不为空成立才能返回_eventLoop// 因为刚创建线程可能还没有执行threadEntry函数就来获取_eventLoop的话获取到的是空值// 必须等threadEntry创建了_eventLoop对象才能返回std::unique_lockstd::mutex lock(_mutex);_cond.wait(lock, [](){return _eventLoop ! nullptr;});loop _eventLoop;}return loop;}private:// 实例化EventLoop对象唤醒_cond上有可能阻塞的线程并且开始运行EventLoop模块的功能void threadEntry(){// 这个函数是每次被线程执行的函数// 每次执行都会新建一个EventLoop对象创建好之后调用start启动EventLoop loop;{std::unique_lockstd::mutex lock(_mutex);_eventLoop loop;_cond.notify_all();}loop.start();// 出了这个函数EventLoop会自动销毁// 但实际上start是一个死循环除非关闭连接或者连接出错了// 就会去调用连接关闭回调函数和错误处理回调函数来关闭连接这样才会退出// 所以也就是说这个创建的loop是贯穿一个连接整个生命周期的// 只有连接结束了这个loop才会被销毁}private:// 互斥锁和条件变量用于实现EventLoop获取的同步关系避免线程创建了但是EventLoop还没有实例化就去获取EventLoop就会获取到nullptrstd::mutex _mutex; // 互斥锁std::condition_variable _cond; // 条件变量EventLoop *_eventLoop; // EventLoop指针变量这个对象需要在线程内实例化std::thread _thread; // EventLoop对应的线程
};2.LoopThreadPool类
LoopThreadPool类是线程池类外界通过线程池的nextEventLoop函数接口获取子线程实际上就是获取从属Reactor。这里的逻辑是当主Reactor监控到一个新连接到来的时候就从线程池的nextEventLoop函数中获取一个从属Reactor让该从属Reactor监控新连接的IO事件。线程池的完整代码如下
class LoopThreadPool
{
public:// 构造函数需要用baseLoop来初始化也就是需要用主reactor来初始化LoopThreadPool(EventLoop *baseLoop):_threadCount(0), _nextLoopIndex(0), _baseLoop(baseLoop){}// 设置线程数量void setThreadCount(int count){_threadCount count;}// 创建所有的从属线程void create(){if(_threadCount 0){_threads.resize(_threadCount);_eventLoops.resize(_threadCount);for(int i 0; i _threadCount; i){_threads[i] new LoopThread;_eventLoops[i] _threads[i]-getEventLoop();}}}// 从线程池中获取eventLoop// 这个函数接口是给TcpServer调用的当一个新连接到来的时候就会创建一个新的Connection对象// 这个Connection对象需要从线程池中拿到一个EventLoop来关联// 关联起来后往后这个Connection对象的所有操作都放在这个EventLoop对象中EventLoop *nextEventLoop(){// 如果线程池的线程数量为0就返回主eventLoopif(_threadCount 0){return _baseLoop;}_nextLoopIndex (_nextLoopIndex 1) % _threadCount;return _eventLoops[_nextLoopIndex];}
private:int _threadCount; // 从属线程的数量int _nextLoopIndex;EventLoop *_baseLoop; // 主EventLoop运行在主线程从属线程数量为0则所有操作都在_baseLoop中进行std::vectorLoopThread * _threads; // 保存所有的LoopThread对象std::vectorEventLoop * _eventLoops; // 从属线程数量大于0则从_eventLoop中进行线程EventLoop分配
};
