我是靠谱客的博主 欣慰大雁,最近开发中收集的这篇文章主要介绍Reactor设计模式 -- 基于EpollET模式epoll的工作方式Reactor设计模式总结,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

TOC

epoll的工作方式

epoll有2种工作方式-水平触发(LT)和边缘触发(ET) ;

epoll默认状态下就是LT工作模式 ,select和poll其实也是工作在LT模式下,epoll既可以支持LT, 也可以支持ET.

这个LT ET的名字是从物理学方面来的; 可以看到LT很长,ET水平看就是一个点;
在这里插入图片描述

LT模式

  • 当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分
  • 如上面的例子, 由于只读了1K数据, 缓冲区中还剩1K数据, 在第二次调用 epoll_wait 时, epoll_wait
    仍然会立刻返回并通知socket读事件就绪
  • 直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回.
  • 支持阻塞读写和非阻塞读写

通俗说就是,一个sock想你发消息或者你向它发,由于种种原因这部分消息只读取了一部分,那么epoll就会不断地返回这个sock就绪,直到与这个sock通信这次结束,才会恢复正常机制;

ET模式

如果我们在第1步将socket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式

  • 当epoll检测到socket上事件就绪时, 必须立刻处理
  • 如上面的例子, 虽然只读了1K的数据, 缓冲区还剩1K的数据, 在第二次调用 epoll_wait 的时候,
    epoll_wait 不会再返回了
  • 也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会
  • ET的性能比LT性能更高( epoll_wait 返回的次数少了很多), Nginx默认采用ET模式使用epoll.
  • 支支持非阻塞读写

通俗说就是,ET模式的epoll_wait,不管这次处理完没完,都不会再进行第二次了,就能让其他更多的sock都能就绪即拷贝,不在一个身上浪费很多时间.显然是更高效的

同时ET这种只来一次的无脑模式,在上层也就意味着想要读取数据完整,我们程序员就得被逼利用循环读取等方式来读取数据;

对比LT和ET

  • LT是 epoll 的默认行为. 使用 ET 能够减少 epoll 触发的次数. 但是代价就是强逼着程序猿一次响应就绪过程中就把
    所有的数据都处理完.
  • 相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些. 但是在 LT 情况下如果也能做到
    每次就绪的文件描述符都立刻处理(完整的处理), 不让这个就绪被重复提示的话, 其实性能也是一样的.
  • 另一方面, ET 的代码复杂程度更高了.

高效原因:假设当sock1 和 sock2同时向server发数据;

  • LT:sock1的数据发了一半,堵塞了,那我epoll_wait会直接返回,继续读sock1这个就绪事件;
  • ET:我只搞你一遍,上层用户while轮询的读取的,如果你不堵塞,我while轮询直接读完整,如果你堵塞了,我读一部分放在你自己的缓冲区,然后break出while,下次再接着读你另一半,这样就不影响后续sock2的通信了,高效;

理解ET模式需要非阻塞文件描述符

  • 使用 ET 模式的 epoll, 需要将文件描述设置为非阻塞. 这个不是接口上的要求, 而是 “工程实践” 上的要求

假设这样的场景: 服务器接受到一个10k的请求, 会向客户端返回一个应答数据. 如果客户端收不到应答, 不会发送第
二个10k请求 ;
在这里插入图片描述

如果服务端写的代码是阻塞式的read, 并且一次只 read 1k 数据的话(read不能保证一次就把所有的数据都读出来,
参考 man 手册的说明, 可能被信号打断), 剩下的9k数据就会待在缓冲区中.,下次epoll_wait立马又返回这个sock就绪,又针对这9k再io,对于其他sock来说就是低效;

在这里插入图片描述

此时由于 epoll 是ET模式, 并不会认为文件描述符读就绪. epoll_wait 就不会再次返回. 剩下的 9k 数据会一直在缓冲区中. 直到下一次客户端再给服务器写数据. epoll_wait 才能返回 (起到了高效的作用)

但是问题来了:套娃问题

  • 务器只读到1k个数据, 要10k读完才会给客户端返回响应数据.
  • 客户端要读到服务器的响应, 才会发送下一个请求
  • 客户端发送了下一个请求, epoll_wait 才会返回, 才能去读缓冲区中剩余的数据.

在这里插入图片描述

所以, 为了解决上述问题(阻塞read可能会会让程序挂起,不一定能一下把完整的请求读完), 于是就可以使用非阻塞轮训的方式来读缓冲区;保证一定能把完整的请求都读出来.

而如果是LT模式没这个问题. 只要缓冲区中的数据没读完, 就能够让 epoll_wait 接着立刻返回没读完的文件描述符读就绪

epoll的使用场景

epoll的高性能, 是有一定的特定场景的. 如果场景选择的不适宜, epoll的性能可能适得其反 ;

具体要根据需求和场景特点来决定使用哪种IO模型.

  1. 对于多连接, 且多连接中只有一部分连接比较活跃时, 比较适合使用epoll ;

    eg(例如, 典型的一个需要处理上万个客户端的服务器, 例如各种互联网APP的入口服务器, 这样的服务器就很适合epoll )这也是能看出互联网企业服务器底层常用epoll的原因

  2. 如果只是系统内部, 服务器和服务器之间进行通信, 只有少数的几个连接, 这种情况下用epoll就并不合适.

Reactor设计模式

Reactor简介

Reactor 释义“反应堆”,是一种事件驱动机制

  1. 事件驱动(event handling)
  2. 可以处理一个或多个输入源(one or more inputs)
  3. 通过Service Handler同步的将输入事件(Event)采用多路复用分发给相应的Request Handler(多个)处理

在这里插入图片描述

结合ET模式+epoll的多路复用:

  1. 同步的等待多个事件源到达(采用epoll()实现)
  2. 将事件多路分解以及分配相应的事件服务进行处理,这个分派采用server集中处理(dispatch
  3. 分解的事件以及对应的事件服务应用从分派服务中分离出去(handler函数指针)

形象化为打地鼠游戏:

在这里插入图片描述

这一个个的Events其实就是epoll所需要监听的等待事件,如果有就绪发生,就跟地鼠冒出来一样,我们ET模式下的Reactor直接通过Dispatch将事件分发出去,我们handler处理函数被逼循环式的直到把他砸下去(循环拷贝直到没数据了),注意,Reactor分发事件出去以后,就脱离了Reactor体系,具体的业务逻辑修改handler即可,你也可以引入线程池,多进程等这类多并发技术进一步提升效率;
在这里插入图片描述

上图黄色就是分发出去的handler;

用Reactor模式设计一个计算器server

我们写的是一个单Reactor单线程的简易reactor,目的是理解reactor模式的工作框架,当然可以根据需要,在处理业务逻辑的地方引入线程池等技术提高效率也可以创建多进程多Reactor,这里不深究;

// 单进程基于epoll的ET非阻塞形式设计的一个Reactor模式
// 检测事件就绪 + 对数据的读写 + 对数据的分析处理

ReactorServer.hpp

#pragma once

#include <iostream>
#include <vector>
#include <string>
#include <unordered_map>
#include <sys/epoll.h>
#include <unistd.h>
void f(std::string s) { std::cout << s << std::endl; }//TEST

namespace ns_reactor
{
    class Event;                         //事件类
    typedef void (*callback_t)(Event &); //回调

    class Reactor; //包含epoll和一个{sock:event}的map;

    class Event
    {
    public:
        int sock_;
        Reactor *r_; // 指向该Event对应的Reactor,

        std::string inbuffer_;  //对应的sock,私有的读取缓冲区
        std::string outbuffer_; //对应的sock,私有的发送缓冲区

        callback_t recv_callback_;  //对应的sock,读回调
        callback_t send_callback_;  //对应的sock,写回调
        callback_t error_callback_; //对应的sock,异常回调
    public:
        Event() : sock_(-1), r_(nullptr)
        {
            recv_callback_ = nullptr;
            send_callback_ = nullptr;
            error_callback_ = nullptr;
        }
        void RegisterCallback(callback_t _recv, callback_t _send, callback_t _error) // Accept后 给非listen _sock搞回调方式
        {
            recv_callback_ = _recv;
            send_callback_ = _send;
            error_callback_ = _error;
        }
        ~Event() {}//这里不能close(ev.sock_)噢,我们在ev的的error处理中关了sock,这里再析构会出错;
    };

    class Reactor
    {
        int epfd_;                              // EPOLL
        std::unordered_map<int, Event> events_; // sock : Event
    public:
        Reactor() : epfd_(-1)
        {
        }
        void InitReactor()
        {
            epfd_ = epoll_create(128);
            if (epfd_ < 0)
            {
                std::cerr << "epoll_create error" << std::endl;
                exit(1);
            }
        }

        void AddEvent(const Event &ev, uint32_t events) //将需要监听事件add入epoll,别忘了reactor的map也要加一份
        {
            epoll_event tmp;
            tmp.data.fd = ev.sock_;
            tmp.events = events;
            if (epoll_ctl(epfd_, EPOLL_CTL_ADD, ev.sock_, &tmp) < 0) // ADD入epoll
            {
                std::cerr << "add error" << std::endl;
                exit(-1);
            }

            events_.insert({ev.sock_, ev}); //插入map
            std::cout << "添加事件成功, sock: " << ev.sock_ << std::endl;
        }

        void DelEvent(const Event &ev)
        {
            epoll_ctl(epfd_, EPOLL_CTL_DEL, ev.sock_, nullptr); // DEL入epoll

            events_.erase(ev.sock_); //从map删掉
        }

        void EnableReadWrite(int sock, bool in, bool out)
        {
            epoll_event ev;
            ev.data.fd = sock;
            ev.events = (in == true ? EPOLLIN : 0) | (out == true ? EPOLLOUT : 0);
            epoll_ctl(epfd_, EPOLL_CTL_MOD, sock, &ev);
        }
        void Dispatcher(int timeout) //派发 wait拿到 就绪了以后 根据类型甩IO任务;
        {
            epoll_event revs[128];
            int n = epoll_wait(epfd_, revs, 128, timeout);

            for (int i = 0; i < n; i++)
            {
                int sock = revs[i].data.fd;
                uint32_t events = revs[i].events;

                //有bug??
                if (events & EPOLLIN)
                {
                    if (events_[sock].recv_callback_)
                        events_[sock].recv_callback_(events_[sock]);
                }

                if (events & EPOLLOUT)
                {
                    if (events_[sock].send_callback_)
                        events_[sock].send_callback_(events_[sock]);
                }
            }
        }

        ~Reactor()
        {
            if (epfd_ >= 0)
                close(epfd_);
        }
    };
}

ReactorServer.cc

#include "ReactorServer.hpp"
#include "sock.hpp"
#include "Accepter.hpp"

using namespace ns_reactor;
using namespace ns_sock;

int main()
{
    // 建立epoll对象;  创建套接字   ->   ET模式 建立连接  ->调派发
    
    Reactor *R = new Reactor();//单Reactor 全局就一个。
    R->InitReactor();

    int listen_sock = Sock::Socket();
    Sock::Bind(listen_sock, 8080); // Bind封装了error处理
    Sock::Listen(listen_sock);
    SetNoBlock(listen_sock);
    //有listen_sock了
    Event ev;
    ev.sock_ = listen_sock;
    ev.r_ = R;
    // Accepter链接管理器 也相当于一种回调函数; listensock-->的epollin == Accepter链接管理器
    ev.RegisterCallback(Accepter, nullptr, nullptr);

    R->AddEvent(ev, EPOLLIN | EPOLLET);

    // listen搞完 链接自动被管理了,进入事件派发器,服务器正式开始运作;
    int timeout = 1000;

    while (true)
    {
        R->Dispatcher(timeout); // epoll_wait派发!
    }

    return 0;
}

sock.hpp

#pragma once

#include <iostream>
#include <string>
#include <cstdlib>
#include <unistd.h>
#include <strings.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
void SetNoBlock(int fd)
{
    int fl = fcntl(fd, F_GETFL); //文件描述符的属性取出来存入fl中
    if (fl < 0)
    { //执行失败返回-1并报错
        perror("fcntl");
        return;
    }
    fcntl(fd, F_SETFL, fl | O_NONBLOCK); //设置fl | O_NONBLOCK 类似位图填充类型设置
}
namespace ns_sock
{
    enum
    {
        SOCKET_ERR = 2,
        BIND_ERR,
        LISTEN_ERR
    };

    const int g_backlog = 5;

    class Sock
    {
    public:
        static int Socket()
        {
            int sock = socket(AF_INET, SOCK_STREAM, 0);
            if (sock < 0)
            {
                std::cerr << "socket error!" << std::endl;
                exit(SOCKET_ERR);
            }
            int opt = 1;
            setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); //将sock搞成非阻塞的  满足ET模式
            return sock;
        }

        static void Bind(const int &sock, const u_int16_t &port)
        {
            struct sockaddr_in local;
            bzero(&local, sizeof(local));
            local.sin_family = AF_INET;
            local.sin_port = htons(port);
            local.sin_addr.s_addr = INADDR_ANY;

            if (bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0)
            {
                std::cerr << "bind error!" << std::endl;
                exit(BIND_ERR);
            }
        }

        static void Listen(const int &sock)
        {
            if (listen(sock, g_backlog) < 0)
            {
                std::cerr << "bind error!" << std::endl;
                exit(LISTEN_ERR);
            }
        }
        static void SetNonBlock(int sock)
        {
            int fl = fcntl(sock, F_GETFL);
            fcntl(sock, F_SETFL, fl | O_NONBLOCK);
        }
    };
}

Accepter.hpp

#pragma once

#include <iostream>
#include <string>
#include <cerrno>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "ReactorServer.hpp"
#include "Callback.hpp"

using namespace ns_reactor;
void Accepter(Event &event) //只会被listensock调;
{

    std::cout << "Accepter 回调方法被调用" << std::endl;
    // ET 模式 连接事件到来,在一个时间段,有很多的连接到来的!保证读完那就得while读干净!
    while (1)
    {
        sockaddr_in peer;
        socklen_t len = sizeof(peer);
        int listen_sock = event.sock_;
        int sock = accept(listen_sock, (sockaddr *)&peer, &len);
        // SetNoBlock(sock);//设置 ET 模式 注意 如果在accept前设置又bug 会显示accept错误。。。。 accept轮询返回,放这里的话会出问题 自己研究;
        if (sock > 0)
        {
            SetNoBlock(sock); //设置 ET 模式 注意 如果在accept前设置又bug 会显示accept错误。。。。
            Event ev;
            ev.r_ = event.r_;
            ev.sock_ = sock;
            ev.RegisterCallback(Recver, Sender, Errorer); //为到来的sock注册三个Request Handler
            (event.r_)->AddEvent(ev, EPOLLIN | EPOLLET);  //甩入eopll中监听
        }

        else if (errno == EAGAIN || errno == EWOULDBLOCK)
        { //非阻塞sock中 返回这两个errno不是出错了,而是 暂时没链接了--(这批 ET 模式下 得 连接读完了)!

            break;
        }

        else if (errno == EINTR)
        {
            //当前的accept调用,被信号中断,没出错,并不代表底层没有新的链接了
            continue;
        }
        else if (sock < 0)
        { //真正出错了
            std::cerr << "accept error" << std::endl;
            continue; //读别的链接
        }
    }
}

Callback.hpp

#pragma once

#include "ReactorServer.hpp"
#include "Util.hpp"
using namespace ns_reactor;

/***************************************
 * return:
 * 0: 本轮读取完毕
 * -1: 读取过程中出错了
 *
 * sock: 要读取的fd
 * out: 输出型参数
 *
 * ************************************/

//读好了直接将返回的 内容 放入该sock的 out缓冲区作为输出型参数本质是 sock的  string in
int RecvHepler(int sock, std::string *out)
{

    while (1) // ET模式 被逼循环直到完;
    {
        char buffer[1024]; //临时的
        ssize_t s = recv(sock, buffer, sizeof(buffer) - 1, 0);

        if (s > 0)
        {                  //读到了,但是不能保证读完了
            buffer[s] = 0; //上'',进而可以string操作
            (*out) += buffer;
        }
        else if (s < 0) //出错,但是读完也是存在于出错中的一种 errno == EAGAIN || errno == EWOULDBLOCK
        {
            if (errno == EAGAIN || errno == EWOULDBLOCK)
                return 0; //读完,没数据了
            else if (errno == EINTR)
                continue; //信号中断 继续读
            else
                return -1; //真有错了,error
        }
        else           // s == 0
            return -1; //被关闭,同上有错
    }
}

//
void Recver(Event &event)
{
    //讲 读数据 和 业务(数据)操作 过程解耦;
    if (-1 == RecvHepler(event.sock_, &(event.inbuffer_)))
    {
        //出错 or 被关闭 --同一给error_handler
        if (event.error_callback_)
            event.error_callback_(event);
        return;
    }

    //读取过程完成了,下来就是业务过程;
    // 往后我们所写的内容,已经和Reactor无关了!全部都是数据分析与处理

    //定制协议:
    // 1+1X2*3X3*5X7*9X9*10X9*
    //  X: 叫做报文和报文之间的分割符
    // 类似1+1:一个完整报文,协议解析, 解决粘包问题
     std::cout<<event.inbuffer_<<std::endl;

    std::vector<std::string> packages; //存"1+1" "2*2"这种等待处理的子段;
    Util::StringSplit(event.inbuffer_, &packages, "X");

    for (int i = 0; i < packages.size(); i++) //
    {
        for (int j = 0; j < packages[i].size(); j++)
        {
            char op = packages[i][j];

            if (op == '+' || op == '-' || op == '*' || op == '/' || op == '%') //
            {

                int a = atoi(packages[i].substr(0, j).c_str()); // string -> char*c
                int b = atoi(packages[i].substr(j + 1).c_str());
                int ret;

                std::cout << a << " " << b << " " << op << std::endl;

                switch (op)
                {
                case '+':
                    ret = a + b;
                    break;
                case '-':
                    ret = a - b;
                    break;
                case '*':
                    ret = a * b;
                    break;
                case '/':
                    ret = a / b;
                    break;
                case '%':
                    ret = a % b;
                    break;
                }
                //构造回复报文
                std::string respond;
                respond += packages[i];
                respond += "=";
                respond += std::to_string(ret);
                respond += "X"; //添加报文和报文的分隔符

                // 5. 发送的核心:a. 不是我们调用send! 是甩给Reactor!b. 还需要将报文添加到outbuffer中即可!
                event.outbuffer_ += respond;

                // 6. 多路转接中,一般EPOLLIN是常设的, 而EPOLLOUT是按需设置的
                (event.r_)->EnableReadWrite(event.sock_, true, true);
            }
        }

        // out string搞定可以监听写事件了;
    }

    // std::cout<<"计算结果:"<<event.outbuffer_<<std::endl;
}

// send_string: 输入输出型参数
// ret >  0 : 缓冲区数据全部发完
// ret == 0 : 本轮发送完, 但是缓冲区还有数据
// ret < 0  : 发送失败
int SenderHepler(Event &event)
{
    int total = event.outbuffer_.size(); //还剩多少i

    while (1)
    {
        int s = send(event.sock_, event.outbuffer_.c_str(), total, 0);

        if (s > 0)
        {
            if (total <= s)
            { //发成功 全完
                event.outbuffer_.erase(0);

                return 1;
            }
            else if (total > s)
            {
                event.outbuffer_.erase(0, s); //头删
                total -= s;
                continue;
            }
        }
        else if (s < 0)
        {
            if (errno == EAGAIN || errno == EWOULDBLOCK)
            {
                //全部写完
                event.outbuffer_.erase(0);
                return 1;
            }
            else if (errno == EINTR)
                continue; //信号中断 继续读
            else
            {
                //真出错;
                return -1;
            }
        }
        else
        {
            return -1;
        }
    }
}
void Sender(Event &event)
{

    if (-1 == SenderHepler(event))
    {

        if (event.error_callback_)
            event.error_callback_(event);
        return;
    }

    (event.r_)->EnableReadWrite(event.sock_, true, false); //写完之后关掉了
}

void Errorer(Event &event) // error 我们就关sock 然后退出
{
    std::cout << "call Errorer... closed sock me too" << std::endl;
    event.r_->DelEvent(event);
    close(event.sock_); //关闭该文件描述符
}

Util.hpp

#pragma once

#include <iostream>
#include <string>
#include <vector>

class Util
{
    // in 解析完 还得删了,不然占空间;
public:
    static void StringSplit(std::string &in, std::vector<std::string> *out, const std::string &sep)
    {
        // 1+1X 2*3X 3*3X 45*
        while (true)
        {

            auto pos = in.find(sep); //返回sep的index
            if (pos == std::string::npos)
            {
                //没有找到分割符 结束分包;
                break;
            }
            std::string sub = in.substr(0, pos); //[)

            out->push_back(sub);           // out尾插
            in.erase(0, pos + sep.size()); // in头删
        }
    }
};

运行结果

client:

在这里插入图片描述

server:

在这里插入图片描述

总结

ET和LT是IO的两种模式;
Reactor是一种上层设计,效率不同模式,重点是事件派发,与ET,LT无关;

区别:

Reactor 模式:非阻塞同步 : 这里的同步io是指IO等待由内核完成,就绪了通知你你来拷贝;(早期服务器)Linux特别常用;

Proactor前摄式;非阻塞异步 : 这里的异步io是指IO等待+拷贝数据都直接由内核完成; (显然异步在实际场景更高效;)

我们上面设计的单Reactor单线程的简易的reactor,是基于Epoll的ET模式下的Reactor,因为将io拷贝操作也写了接口,交给了Reactor管理,因此来说算是一个 半同步半异步的Reactor 反应堆模式;

最后

以上就是欣慰大雁为你收集整理的Reactor设计模式 -- 基于EpollET模式epoll的工作方式Reactor设计模式总结的全部内容,希望文章能够帮你解决Reactor设计模式 -- 基于EpollET模式epoll的工作方式Reactor设计模式总结所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(42)

评论列表共有 0 条评论

立即
投稿
返回
顶部