概述
TOC
epoll的工作方式
epoll有2种工作方式-水平触发(LT)和边缘触发(ET) ;
epoll默认状态下就是LT工作模式 ,select和poll其实也是工作在LT模式下,epoll既可以支持LT, 也可以支持ET.
这个LT ET的名字是从物理学方面来的; 可以看到LT很长,ET水平看就是一个点;
LT模式
- 当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分
- 如上面的例子, 由于只读了1K数据, 缓冲区中还剩1K数据, 在第二次调用 epoll_wait 时, epoll_wait
仍然会立刻返回并通知socket读事件就绪- 直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回.
- 支持阻塞读写和非阻塞读写
通俗说就是,一个sock想你发消息或者你向它发,由于种种原因这部分消息只读取了一部分,那么epoll就会不断地返回这个sock就绪,直到与这个sock通信这次结束,才会恢复正常机制;
ET模式
如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式
- 当epoll检测到socket上事件就绪时, 必须立刻处理
- 如上面的例子, 虽然只读了1K的数据, 缓冲区还剩1K的数据, 在第二次调用 epoll_wait 的时候,
epoll_wait 不会再返回了- 也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会
- ET的性能比LT性能更高( epoll_wait 返回的次数少了很多), Nginx默认采用ET模式使用epoll.
- 支支持非阻塞读写
通俗说就是,ET模式的epoll_wait,不管这次处理完没完,都不会再进行第二次了,就能让其他更多的sock都能就绪即拷贝,不在一个身上浪费很多时间.显然是更高效的
同时ET这种只来一次的无脑模式,在上层也就意味着想要读取数据完整,我们程序员就得被逼利用循环读取等方式来读取数据;
对比LT和ET
- LT是 epoll 的默认行为. 使用 ET 能够减少 epoll 触发的次数. 但是代价就是强逼着程序猿一次响应就绪过程中就把
所有的数据都处理完. - 相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些. 但是在 LT 情况下如果也能做到
每次就绪的文件描述符都立刻处理(完整的处理), 不让这个就绪被重复提示的话, 其实性能也是一样的. - 另一方面, ET 的代码复杂程度更高了.
高效原因:假设当sock1 和 sock2同时向server发数据;
- LT:sock1的数据发了一半,堵塞了,那我epoll_wait会直接返回,继续读sock1这个就绪事件;
- ET:我只搞你一遍,上层用户while轮询的读取的,如果你不堵塞,我while轮询直接读完整,如果你堵塞了,我读一部分放在你自己的缓冲区,然后break出while,下次再接着读你另一半,这样就不影响后续sock2的通信了,高效;
理解ET模式需要非阻塞文件描述符
- 使用 ET 模式的 epoll, 需要将文件描述设置为非阻塞. 这个不是接口上的要求, 而是 “工程实践” 上的要求
假设这样的场景: 服务器接受到一个10k的请求, 会向客户端返回一个应答数据. 如果客户端收不到应答, 不会发送第
二个10k请求 ;
如果服务端写的代码是阻塞式的read, 并且一次只 read 1k 数据的话(read不能保证一次就把所有的数据都读出来,
参考 man 手册的说明, 可能被信号打断), 剩下的9k数据就会待在缓冲区中.,下次epoll_wait立马又返回这个sock就绪,又针对这9k再io,对于其他sock来说就是低效;
此时由于 epoll 是ET模式, 并不会认为文件描述符读就绪. epoll_wait 就不会再次返回. 剩下的 9k 数据会一直在缓冲区中. 直到下一次客户端再给服务器写数据. epoll_wait 才能返回 (起到了高效的作用)
但是问题来了:套娃问题
- 务器只读到1k个数据, 要10k读完才会给客户端返回响应数据.
- 客户端要读到服务器的响应, 才会发送下一个请求
- 客户端发送了下一个请求, epoll_wait 才会返回, 才能去读缓冲区中剩余的数据.
所以, 为了解决上述问题(阻塞read可能会会让程序挂起,不一定能一下把完整的请求读完), 于是就可以使用非阻塞轮训的方式来读缓冲区;保证一定能把完整的请求都读出来.
而如果是LT模式没这个问题. 只要缓冲区中的数据没读完, 就能够让 epoll_wait 接着立刻返回没读完的文件描述符读就绪
epoll的使用场景
epoll的高性能, 是有一定的特定场景的. 如果场景选择的不适宜, epoll的性能可能适得其反 ;
具体要根据需求和场景特点来决定使用哪种IO模型.
对于多连接, 且多连接中只有一部分连接比较活跃时, 比较适合使用epoll ;
eg(例如, 典型的一个需要处理上万个客户端的服务器, 例如各种互联网APP的入口服务器, 这样的服务器就很适合epoll )这也是能看出互联网企业服务器底层常用epoll的原因
如果只是系统内部, 服务器和服务器之间进行通信, 只有少数的几个连接, 这种情况下用epoll就并不合适.
Reactor设计模式
Reactor简介
Reactor 释义“反应堆”,是一种事件驱动机制
- 事件驱动(event handling)
- 可以处理一个或多个输入源(one or more inputs)
- 通过Service Handler同步的将输入事件(Event)采用多路复用分发给相应的Request Handler(多个)处理
结合ET模式+epoll的多路复用:
- 同步的等待多个事件源到达(采用epoll()实现)
- 将事件多路分解以及分配相应的事件服务进行处理,这个分派采用server集中处理(dispatch)
- 分解的事件以及对应的事件服务应用从分派服务中分离出去(handler函数指针)
形象化为打地鼠游戏:
这一个个的Events其实就是epoll所需要监听的等待事件,如果有就绪发生,就跟地鼠冒出来一样,我们ET模式下的Reactor直接通过Dispatch将事件分发出去,我们handler处理函数被逼循环式的直到把他砸下去(循环拷贝直到没数据了),注意,Reactor分发事件出去以后,就脱离了Reactor体系,具体的业务逻辑修改handler即可,你也可以引入线程池,多进程等这类多并发技术进一步提升效率;
上图黄色就是分发出去的handler;
用Reactor模式设计一个计算器server
我们写的是一个单Reactor单线程的简易reactor,目的是理解reactor模式的工作框架,当然可以根据需要,在处理业务逻辑的地方引入线程池等技术提高效率,也可以创建多进程多Reactor,这里不深究;
// 单进程基于epoll的ET非阻塞形式设计的一个Reactor模式
// 检测事件就绪 + 对数据的读写 + 对数据的分析处理
ReactorServer.hpp
#pragma once
#include <iostream>
#include <vector>
#include <string>
#include <unordered_map>
#include <sys/epoll.h>
#include <unistd.h>
void f(std::string s) { std::cout << s << std::endl; }//TEST
namespace ns_reactor
{
class Event; //事件类
typedef void (*callback_t)(Event &); //回调
class Reactor; //包含epoll和一个{sock:event}的map;
class Event
{
public:
int sock_;
Reactor *r_; // 指向该Event对应的Reactor,
std::string inbuffer_; //对应的sock,私有的读取缓冲区
std::string outbuffer_; //对应的sock,私有的发送缓冲区
callback_t recv_callback_; //对应的sock,读回调
callback_t send_callback_; //对应的sock,写回调
callback_t error_callback_; //对应的sock,异常回调
public:
Event() : sock_(-1), r_(nullptr)
{
recv_callback_ = nullptr;
send_callback_ = nullptr;
error_callback_ = nullptr;
}
void RegisterCallback(callback_t _recv, callback_t _send, callback_t _error) // Accept后 给非listen _sock搞回调方式
{
recv_callback_ = _recv;
send_callback_ = _send;
error_callback_ = _error;
}
~Event() {}//这里不能close(ev.sock_)噢,我们在ev的的error处理中关了sock,这里再析构会出错;
};
class Reactor
{
int epfd_; // EPOLL
std::unordered_map<int, Event> events_; // sock : Event
public:
Reactor() : epfd_(-1)
{
}
void InitReactor()
{
epfd_ = epoll_create(128);
if (epfd_ < 0)
{
std::cerr << "epoll_create error" << std::endl;
exit(1);
}
}
void AddEvent(const Event &ev, uint32_t events) //将需要监听事件add入epoll,别忘了reactor的map也要加一份
{
epoll_event tmp;
tmp.data.fd = ev.sock_;
tmp.events = events;
if (epoll_ctl(epfd_, EPOLL_CTL_ADD, ev.sock_, &tmp) < 0) // ADD入epoll
{
std::cerr << "add error" << std::endl;
exit(-1);
}
events_.insert({ev.sock_, ev}); //插入map
std::cout << "添加事件成功, sock: " << ev.sock_ << std::endl;
}
void DelEvent(const Event &ev)
{
epoll_ctl(epfd_, EPOLL_CTL_DEL, ev.sock_, nullptr); // DEL入epoll
events_.erase(ev.sock_); //从map删掉
}
void EnableReadWrite(int sock, bool in, bool out)
{
epoll_event ev;
ev.data.fd = sock;
ev.events = (in == true ? EPOLLIN : 0) | (out == true ? EPOLLOUT : 0);
epoll_ctl(epfd_, EPOLL_CTL_MOD, sock, &ev);
}
void Dispatcher(int timeout) //派发 wait拿到 就绪了以后 根据类型甩IO任务;
{
epoll_event revs[128];
int n = epoll_wait(epfd_, revs, 128, timeout);
for (int i = 0; i < n; i++)
{
int sock = revs[i].data.fd;
uint32_t events = revs[i].events;
//有bug??
if (events & EPOLLIN)
{
if (events_[sock].recv_callback_)
events_[sock].recv_callback_(events_[sock]);
}
if (events & EPOLLOUT)
{
if (events_[sock].send_callback_)
events_[sock].send_callback_(events_[sock]);
}
}
}
~Reactor()
{
if (epfd_ >= 0)
close(epfd_);
}
};
}
ReactorServer.cc
#include "ReactorServer.hpp"
#include "sock.hpp"
#include "Accepter.hpp"
using namespace ns_reactor;
using namespace ns_sock;
int main()
{
// 建立epoll对象; 创建套接字 -> ET模式 建立连接 ->调派发
Reactor *R = new Reactor();//单Reactor 全局就一个。
R->InitReactor();
int listen_sock = Sock::Socket();
Sock::Bind(listen_sock, 8080); // Bind封装了error处理
Sock::Listen(listen_sock);
SetNoBlock(listen_sock);
//有listen_sock了
Event ev;
ev.sock_ = listen_sock;
ev.r_ = R;
// Accepter链接管理器 也相当于一种回调函数; listensock-->的epollin == Accepter链接管理器
ev.RegisterCallback(Accepter, nullptr, nullptr);
R->AddEvent(ev, EPOLLIN | EPOLLET);
// listen搞完 链接自动被管理了,进入事件派发器,服务器正式开始运作;
int timeout = 1000;
while (true)
{
R->Dispatcher(timeout); // epoll_wait派发!
}
return 0;
}
sock.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstdlib>
#include <unistd.h>
#include <strings.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
void SetNoBlock(int fd)
{
int fl = fcntl(fd, F_GETFL); //文件描述符的属性取出来存入fl中
if (fl < 0)
{ //执行失败返回-1并报错
perror("fcntl");
return;
}
fcntl(fd, F_SETFL, fl | O_NONBLOCK); //设置fl | O_NONBLOCK 类似位图填充类型设置
}
namespace ns_sock
{
enum
{
SOCKET_ERR = 2,
BIND_ERR,
LISTEN_ERR
};
const int g_backlog = 5;
class Sock
{
public:
static int Socket()
{
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0)
{
std::cerr << "socket error!" << std::endl;
exit(SOCKET_ERR);
}
int opt = 1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); //将sock搞成非阻塞的 满足ET模式
return sock;
}
static void Bind(const int &sock, const u_int16_t &port)
{
struct sockaddr_in local;
bzero(&local, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
{
std::cerr << "bind error!" << std::endl;
exit(BIND_ERR);
}
}
static void Listen(const int &sock)
{
if (listen(sock, g_backlog) < 0)
{
std::cerr << "bind error!" << std::endl;
exit(LISTEN_ERR);
}
}
static void SetNonBlock(int sock)
{
int fl = fcntl(sock, F_GETFL);
fcntl(sock, F_SETFL, fl | O_NONBLOCK);
}
};
}
Accepter.hpp
#pragma once
#include <iostream>
#include <string>
#include <cerrno>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "ReactorServer.hpp"
#include "Callback.hpp"
using namespace ns_reactor;
void Accepter(Event &event) //只会被listensock调;
{
std::cout << "Accepter 回调方法被调用" << std::endl;
// ET 模式 连接事件到来,在一个时间段,有很多的连接到来的!保证读完那就得while读干净!
while (1)
{
sockaddr_in peer;
socklen_t len = sizeof(peer);
int listen_sock = event.sock_;
int sock = accept(listen_sock, (sockaddr *)&peer, &len);
// SetNoBlock(sock);//设置 ET 模式 注意 如果在accept前设置又bug 会显示accept错误。。。。 accept轮询返回,放这里的话会出问题 自己研究;
if (sock > 0)
{
SetNoBlock(sock); //设置 ET 模式 注意 如果在accept前设置又bug 会显示accept错误。。。。
Event ev;
ev.r_ = event.r_;
ev.sock_ = sock;
ev.RegisterCallback(Recver, Sender, Errorer); //为到来的sock注册三个Request Handler
(event.r_)->AddEvent(ev, EPOLLIN | EPOLLET); //甩入eopll中监听
}
else if (errno == EAGAIN || errno == EWOULDBLOCK)
{ //非阻塞sock中 返回这两个errno不是出错了,而是 暂时没链接了--(这批 ET 模式下 得 连接读完了)!
break;
}
else if (errno == EINTR)
{
//当前的accept调用,被信号中断,没出错,并不代表底层没有新的链接了
continue;
}
else if (sock < 0)
{ //真正出错了
std::cerr << "accept error" << std::endl;
continue; //读别的链接
}
}
}
Callback.hpp
#pragma once
#include "ReactorServer.hpp"
#include "Util.hpp"
using namespace ns_reactor;
/***************************************
* return:
* 0: 本轮读取完毕
* -1: 读取过程中出错了
*
* sock: 要读取的fd
* out: 输出型参数
*
* ************************************/
//读好了直接将返回的 内容 放入该sock的 out缓冲区作为输出型参数本质是 sock的 string in
int RecvHepler(int sock, std::string *out)
{
while (1) // ET模式 被逼循环直到完;
{
char buffer[1024]; //临时的
ssize_t s = recv(sock, buffer, sizeof(buffer) - 1, 0);
if (s > 0)
{ //读到了,但是不能保证读完了
buffer[s] = 0; //上'