我是靠谱客的博主 娇气小兔子,最近开发中收集的这篇文章主要介绍Netty-IO模型,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Netty的介绍

  1. Netty是由JBOSS提供的一个Java的开源框架。
  2. Netty是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络IO程序
  3. Netty主要针对在TCP协议下,面向Clients端的高并发应用,或者Peer-to-Peer场景下的大量数据持续传输的应用
  4. Netty本质是一个NIO框架,适用于服务器通讯相关的多种应用场景
  5. 要透彻理解Netty,需要先学习NIO
    在这里插入图片描述

原生Java-NIO存在的问题

  1. NIO的类库和API繁杂,使用麻烦:需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
  2. 需要具备其他的额外技能:要熟悉Java多线程编程,因为NIO编程设计到Reactor模式,必须对多线程和网络编程非常熟悉,才能写出高质量的NIO程序。
  3. 开发工作量和难度非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等。
  4. JDK NIO的Bug:臭名昭著的Epoll Bug,会导致Selector空轮循,最终导致CPU100%,知道JDK1.7该问题依旧存在,没有被根本解决

Netty的优点

  1. 设计优雅:适用于各种传输类型的统一API阻塞和非阻塞Socket;基于灵活且可扩展的事件模型,可以清晰的分离关注点;高度可定制的线程模型-单线程,一个或多个线程池。
  2. 使用方便:详细记录的Javadoc,用户指南和示例:没有吧其他的依赖。
  3. 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
  4. 安全:完整的SSL/TLS和StartTLS支持
  5. 社区活跃、不断更新。

Reactor模式

I/O复用结合线程池,就是Reactor模式基本设计思想,如图:
说明:

  1. Reactor模式,通过一个或多个输入同时传递给服务器的模型(基于事件驱动)
  2. 服务器程序处理传入的多个请求,并将他们同步分派到响应的处理线程,因此Reactor模式也叫分发模式
  3. Reactor模式使用IO复用监听事件,收到事件后,分发给某个线程(进程)这点就是网络服务器高并发处理关键
    在这里插入图片描述

Reactor模式的核心组成:

Reactor:Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO事件做出反应。
Handlers:处理程序执行IO事件要完成的实际事件。

单Reactor单线程模式

在这里插入图片描述
在这里插入图片描述

单Reactor多线程模式

在这里插入图片描述

在这里插入图片描述

方案优缺点:
优点:可以充分的利用多核cpu的处理能力
缺点:线程池中的多线程数据共享和访问比较复杂,Reactor处理所有的事件的监听和响应,在单线程中运行,高并发场景下容易出现性能瓶颈

主从Reactor多线程模式

在这里插入图片描述

针对前面单个Reactor多线程模型中,Reactor在单线程中运行,高并发场景下容易成为性能瓶颈,可以让Reactor在多线程中运行
方案说明:

  1. Reactor主线程MainReactor对象通过seclect监听连接事件,收到事件后,通过Acceptor处理连接事件
  2. 当Acceptor处理连接事件后,MainReactor将连接分配给SubReactor
  3. SubReactor将连接加入到连接队列进行监听,并创建handler进行各种事件处理
  4. 当有新的事件发生时,SubReactor就会调用对应的handler线程处理
  5. handler通过read读取数据,分发给后面的worker线程处理
  6. worker线程池分配独立的worker线程进行具体的业务处理,并返回结果
  7. handler收到响应的结果后,在通过send将结果返回给client
  8. Reactor主线程可以对应多个Reactor子线程,即MainReactor可以关联多个SubReactor

主从Reactor优缺点:
优点:
(1)父线程与子线程的数据交互简单职责明确,父线程只需接收新连接,子线程完成后续的业务处理
(2)Reactor主线程只需要接收新连接传给子线程,子线程无需返回数据
缺点:
编程复杂度较高

Netty模型

在这里插入图片描述

工作原理说明:
(1)Netty抽象出两组线程池BossGroup专门负责接收客户端的连接,WorkGroup专门负责网络的读写
(2)BossGroup和WorkerGroup类型都是NioEventLoopGroup
(3)NIOEventLoopGroup相当于一个事件循环组,这个组中含有多个时间循环,每一个时间循环是NIOEventLoop
(4)NIOEventLoop表示一个不断循环的执行处理任务的线程,每个NIOEventLoop都有一个selector,用于监听绑定在其上的socket的网络通讯
(5)NIOEventLoopGroup可以有多个线程,即可以含有多个NIOEventLoop
(6)每个Boss NIOEventLoopGroup循环执行的步骤有3步
1、 轮询accept事件
2、 处理accept事件,与client建立连接,生成NIOSocketChannel,并将其注册到某个Worker NIOEventLoop上的Seclector
3、 处理任务队列的任务,即runAllTasks
(7)每个Worker NIOEventLoop 循环执行步骤
1、 轮询read / write事件
2、 处理IO事件,在对应的NIOSocketChannel处理
3、 处理任务队列的任务,即runAllTasks
(8)每个WorkerNIOEventLoop处理业务时,会使用pipeline(管道),pipeline中包含了Channel,即通过pipeline可以获得对应的通道。

简单来看Netty这个模型中包含这样两组对象:
BossGroup(NIOEventLoopGroup)、WorkerGroup(NIOEventLoopGroup)
这两个对象中包含多个具体的NioEventLoop对象
NioEventLoop对象中包含Selector对象和Taskqueue队列
这两个对象通过Channel对象来连接,Channel对象在连接workergroup组中的对象时,与pipeline对象互相包含,并且添加了自定义的handler对象(用来具体处理消息的对象)

案例源码分析Netty模型:
我们主要通过案例来debug观察模型的具体实现,主要观察以下几个问题:
在这里插入图片描述
(1)我们验证模型图的上图这个部分,BossGroup和WorkerGroup本质都是NIOEventLoopGroup,其子线程数默认是cpu核数*2
如下图所示,可以通过初始化设置子线程数,也就是Group的大小,Group中是多个NIOEventLoop对象
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

(2)我们验证模型图的上图这个部分,看一下NioEventLoop对象的结构,每个NioEventLoop对象都有单独的selector和TaskQueue队列,同样的,bossGroup和workerGroup结构都是一样的,只不过职责不同而已,所以workerGroup中是多个NioEventLoop对象,也就增强了并发能力。
在这里插入图片描述
(3)WorkGroup中默认为8个线程,客户端请求在8个线程中怎么轮询分配
我们在NettyServerhandler对象的read函数中输出当前线程名来进行验证

在这里插入图片描述

在这里插入图片描述

(4)我们验证模型图的上图这个部分,我们通过handler中read函数中的ctx上下文来输出看一下Channel和pipeline这两个对象的结构。pipeline和channel是相互包含的关系。并且全部包含在ctx这个对象中。
pipeline的结构,其中可以获取到对应的channel
在这里插入图片描述
Channel的结构,同样可以获取到pipeline,还可以获取到客户端地址和服务端地址,以及当前所在的eventLoop对象
在这里插入图片描述
简单的案例代码如下:共4部分,NettyServer、NettyClient、NettyServerhandler、NettyClienthandler
NettyServer:

//NettyServer
package com.sgg.Netty.simple;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        //创建BossGroup和WorkerGroup
        //说明
        //1、创建两个线程组BossGroup和WorkerGroup
        //2、BossGroup只负责处理请求,真正和客户端的业务处理会交给WorkerGroup
        //3、两个都是无限循环
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        //创建服务端的启动对象,配置启动参数
        ServerBootstrap bootstrap = new ServerBootstrap();

        //使用链式变成来进行设置
        bootstrap.group(bossGroup,workerGroup)       //设置两个线程组
                .channel(NioServerSocketChannel.class)     //使用NioServerSocketChannel作为服务器的通道实现
                .option(ChannelOption.SO_BACKLOG,128)       //设置线程队列的连接个数
                .childOption(ChannelOption.SO_KEEPALIVE,true)       //设置保持活动连接状态
                .childHandler(new ChannelInitializer<SocketChannel>() {         //创建一个通道测试对象
                    //给pipeline设置处理器
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new NettyServerhandler());    //向这个管道增加handler处理器
                    }
                });

        System.out.println(".....服务器准备好了");

        //绑定一个端口并且同步,生成一个ChannelFuture对象
        ChannelFuture cf = bootstrap.bind(6668).sync();

        //对关闭通道进行监听
        cf.channel().closeFuture().sync();

    }
}

NettyClient:

//NettyClient
package com.sgg.Netty.simple;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        //客户端需要一个事件循环组
        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try{
            //创建客户端启动对象
            //注意客户端不是ServerBootstrap,是Bootstrap
            Bootstrap bootstrap = new Bootstrap();

            //设置相关参数
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyClienthandler());//加入自己的handler
                        }
                    });
            System.out.println("客户端OK");
            //启动客户端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",6668).sync();
            //给关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        }finally {
            //关闭
            eventLoopGroup.shutdownGracefully();
        }


    }
}

NettyServerhandler

//NettyServerhandler
package com.sgg.Netty.simple;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;

/**
 * 我们自定义一个handler,相当于我们自定义了一个消息的处理对象,
 * 但是在netty中,我们想实现这个handler,需要继承某个netty规定好的模板
 *
 * 这样我们定义的handler才可以使用,具备handler的功能
 */


public class NettyServerhandler extends ChannelInboundHandlerAdapter {

    //重写读取数据的方法

    /**
     *
     * @param ctx 上下文对象,含有管道pipeline、通道、地址
     * @param msg 客户端发送的数据(object形式)
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("服务器端线程:"+Thread.currentThread().getName());
        System.out.println("server ctx = "+ctx);
        System.out.println("Channel和pipeline的结构如下");
        Channel channel = ctx.channel();
        ChannelPipeline pipeline = ctx.pipeline();      //本质是一个双向链表
        //将msg转换成ByteBuf,这个buffer类是Netty提供的,并不是NIO的buffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送的消息:"+buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端的地址是:"+ctx.channel().remoteAddress());
    }

    /**
     * 数据读取完毕
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //将数据写到缓存并刷新到你客户端
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~",CharsetUtil.UTF_8));
    }

    /**
     * 处理异常,一般需要关闭通道
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }


}

NettyServerhandler

//NettyServerhandler
package com.sgg.Netty.simple;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;

/**
 * 我们自定义一个handler,相当于我们自定义了一个消息的处理对象,
 * 但是在netty中,我们想实现这个handler,需要继承某个netty规定好的模板
 *
 * 这样我们定义的handler才可以使用,具备handler的功能
 */


public class NettyServerhandler extends ChannelInboundHandlerAdapter {

    //重写读取数据的方法

    /**
     *
     * @param ctx 上下文对象,含有管道pipeline、通道、地址
     * @param msg 客户端发送的数据(object形式)
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("服务器端线程:"+Thread.currentThread().getName());
        System.out.println("server ctx = "+ctx);
        System.out.println("Channel和pipeline的结构如下");
        Channel channel = ctx.channel();
        ChannelPipeline pipeline = ctx.pipeline();      //本质是一个双向链表
        //将msg转换成ByteBuf,这个buffer类是Netty提供的,并不是NIO的buffer
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客户端发送的消息:"+buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端的地址是:"+ctx.channel().remoteAddress());
    }

    /**
     * 数据读取完毕
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //将数据写到缓存并刷新到你客户端
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~",CharsetUtil.UTF_8));
    }

    /**
     * 处理异常,一般需要关闭通道
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }


}

NettyClienthandler

package com.sgg.Netty.simple;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyClienthandler extends ChannelInboundHandlerAdapter {
    //当通道就绪就会触发该方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client"+ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,服务端~", CharsetUtil.UTF_8));
    }
    //当通道有读取事件时,会触发该方法
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器的地址是:"+ctx.channel().remoteAddress());
    }

    //异常处理
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }


}

在这里插入图片描述

Netty异步模型

在这里插入图片描述
Future-Listener机制:
(1)当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的ChannelFuture来获取操作的执行状态,注册监听函数来执行完成后的操作。
(2)常见有如下操作:

  • 通过isDone来判断当前操作是否完成;
  • 通过isSuccess来判断操作是否成功;
  • 通过getCause来获取已完成的当前操作失败的原因; 通过isCancelled来判断已完成的操作是否呗取消;
  • 通过addListener来注册监听器,当操作已完成,将会通知指定的监听器,如果Future对象已完成,则通知指定的监听器
//Future-Listener机制
//第一种写法
//绑定一个端口并且同步,生成一个ChannelFuture对象
        ChannelFuture cf = bootstrap.bind(6668).sync();
        //给cf注册监听器,监控我们关心的事件
        cf.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if(cf.isSuccess()){
                    System.out.println("监听端口成功");
                }else{
                    System.out.println("监听端口失败");
                }
            }
        });
//第二种写法
//绑定一个端口并且同步,生成一个ChannelFuture对象
        bootstrap.bind(6668).addListener(future->{    
            if(cf.isSuccess()){
                System.out.println("监听端口成功");
            }else{
                System.out.println("监听端口失败");
            }   
        });

(3)相比传统阻塞IO,执行IO操作后线程会被阻塞住,知道操作完成;异步处理的好处是不会造成线程阻塞,线程在IO操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量。

快速入门实例-HTTP服务

我们实现一个HTTP的服务器不需要写客户端,直接从浏览器访问即可,所以我们的服务端包含3个文件,分别是TestServer、TestServerInitializer、TesthttpServerhandler
TestServer:

package com.sgg.Netty.http;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class TestServer {
    public static void main(String[] args) throws Exception{
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try{
            ServerBootstrap bootstrap = new ServerBootstrap();

            bootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,128)       //设置线程队列的连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE,true)       //设置保持活动连接状态
                    .childHandler(new TestServerInitializer());

            System.out.println(".....服务器准备好了");

            ChannelFuture cf = bootstrap.bind(8849).sync();
            cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if(cf.isSuccess()){
                        System.out.println("监听端口成功");
                    }else{
                        System.out.println("监听端口失败");
                    }
                }
            });
            cf.channel().closeFuture().sync();

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

TestServerInitializer

package com.sgg.Netty.http;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;

public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //向管道加入处理器

        //得到管道
        ChannelPipeline pipeline = socketChannel.pipeline();
        //加入一个netty提供的httpServerCodec codec =>[coder - decoder]
        //1、HttpServerCodec是Netty提供的处理http的编-解码器
        pipeline.addLast("MyHttpServerCodec",new HttpServerCodec());
        //2、增加一个自定义的handler
        pipeline.addLast("MyTesthttpServerhandler",new TesthttpServerhandler());
    }
}

TesthttpServerhandler

package com.sgg.Netty.http;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;


/**
 * 1、SimpleChannelInboundHandler是ChannelInboundHandlerAdapter模板的子类,方法更丰富
 * 2、HttpObject客户端服务器端的数据呗封装成HttpObject,相当于我们自己代码中对传送数据进行R封装一个道理
 */

public class TesthttpServerhandler extends SimpleChannelInboundHandler<HttpObject> {
    //channelRead0读物客户端数据
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        System.out.println("tag1");
        //判断msg是不是httprequest
        if(msg instanceof HttpRequest){
            System.out.println("httpObject类型 = "+msg.getClass());
            System.out.println("客户端地址"+ctx.channel().remoteAddress());

            System.out.println("msg 类型:"+msg.getClass());
            System.out.println("客户端地址:"+ctx.channel().remoteAddress());
            // 给浏览器回复信息 [http协议]
            ByteBuf content = Unpooled.copiedBuffer("hello,我是服务器...", CharsetUtil.UTF_16);
            // 构造http响应,httpresponse
            DefaultHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
            httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plaini");
            httpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,content.readableBytes());
            // 发送构建好的 response
            ctx.writeAndFlush(httpResponse);
        }
    }
}

Netty核心组件

在这里插入图片描述

bootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,128)       //设置线程队列的连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE,true)       //设置保持活动连接状态
                    .childHandler(new TestServerInitializer());

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

最后

以上就是娇气小兔子为你收集整理的Netty-IO模型的全部内容,希望文章能够帮你解决Netty-IO模型所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(38)

评论列表共有 0 条评论

立即
投稿
返回
顶部