我是靠谱客的博主 拼搏金鱼,最近开发中收集的这篇文章主要介绍Netty核心原理一、Netty是什么二、Reactor模型三、Netty线程模型四、Netty核心API五、实现Netty服务端和客户端,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

目录

一、Netty是什么

二、Reactor模型

1、单Reactor单线程

2、单Reactor多线程

3、主从Reactor多线程 

三、Netty线程模型

1、从简理解:Boss和Worker线程

2、进阶详细学习:Boss和Worker线程池

四、Netty核心API

1、事件处理器:ChannelHandler接口及其实现类

2、事件处理链表:ChannelPipeline接口

3、链表中的事件结点:ChannelHandlerContext

4、创建初始化通道对象:ChannelInitializer类

5、参数设置:ChannelOption

6、获取异步处理状态:ChannelFuture接口

7、创建线程组:EventLoopGroup

8、端启动助手及设置线程组:ServerBootstrap和Bootstrap

9、缓冲区工具类:Unpooled类

五、实现Netty服务端和客户端

1、服务端实现:NettyServer

2、客户端实现:NettyClient


NIO学习完,发现一个问题就是---NIO的类库和API也太繁杂了!使用起来就很麻烦,几个小demo给我搞得眼花缭乱....

这时Netty出现了。


一、Netty是什么

Netty是一个基于NIO的网络编程框架。它提供异步的、基于事件驱动的网络应用程序框
架,用以快速开发高性能、高可靠性的网络 IO 程序。


二、Reactor模型

1、单Reactor单线程

图为单线程的Reator模型(我理解的)。

2、单Reactor多线程

多线程情况下也不难,只是多了几个Hardler,而Harlder只负责响应事件(read和send)而不做具体的业务处理,它会麻烦worker线程池,找一个worker线程去处理业务,read后分发任务给线程处理,处理完返回给Hardler,然后send给client。

会突然发现这个Reactor模型好眼熟,不觉得很像NIO吗?其实两者机制很像,可以理解为Reactor模型是基于NIO产生的(可以勉强这么理解,实际上这两个东西的性质不统一)

3、主从Reactor多线程 

这回就更复杂了,Reactor线程变成了两个。但其实机制和上面的多线程状态也差不多:此时Reactor主线程只负责监听客户端连接请求,和客户端建立连接之后就将连接交由Reactor子线程监听后面的 IO 事件。

  1. Reactor主线程对象监听客户端连接事件
  2. 收到事件后,通过Acceptor处理客户端连接事件
  3. 当Acceptor连接好后,Reactor主线程将连接分配给Reactor子线程
  4. 子线程将连接加入自己的连接队列监听,并创建Handler对各种事件进行处理
  5. 当连接上有新事情发生时,子线程调用对应的Handler处理
  6. Handler通过read从连接上读取请求数据,将请求发给 Worker 线程池进行业务处理
  7. 可怜的worker线程终于处理好业务了,返回给Handler,然后Handler用send返回给客户端

一个 Reactor主线程 可以对应多个 Reactor子线程

其实说白了就是一个接着一个丢锅都不愿意干活,最后层层传递交给最底层的worker线程干。


三、Netty线程模型

又回归到我们的Netty。前面为什么介绍了Reactor模型呢?因为Netty基于主从 Reactor 多线程模式

1、从简理解:Boss和Worker线程

  1. BossGroup 线程(类似于主 Reactor)中维护 Selector,ServerSocketChannel 注册到Selector上,只关注建立连接请求事件;
  2. 当接收到来自客户端的连接时,通过 ServerSocketChannel.accept ()获得对应的 SocketChannel,并封装成 NioSocketChannel 注册到 WorkerGroup 线程(类似于从 Reactor)中的Selector(每个 Selector 运行在一个线程中);
  3. 当 WorkerGroup 线程中的 Selector 监听到自己感兴趣的事就调用 Handler 处理

2、进阶详细学习:Boss和Worker线程池

抽象出了两组线程池:BossGroup线程池中的线程负责和客户端建立连接WorkerGroup线程池中的的线程负责处理连接上的读写

两个线程池的类型都是NioEventLoopGroup(事件循环组)。事件循环线程池中的每个线程都配有一个selector用于监听管道Channa上l是否有连接。

BossGroup循环执行以下三个步骤:

  1. select:轮询连接事件(OP_ACCEPT事件
  2. processSelectedKeys事件:处理连接事件,与客户端连接生成NioSocketChannal,并将其注册到WorkerGroup的selector上
  3. runAllTasks:再以此循环处理事件队列中的其他事件

 WorkerGroup循环执行以下三个步骤:

  1. select:轮询注册了的NioSocketChannal的读写事件(OP_READ和OP_WRITE事件
  2. processSelectorKeys事件:处理读写事件
  3. runAllTasks:再以此循环处理事件队列中的其他事件

四、Netty核心API

1、事件处理器:ChannelHandler接口及其实现类

ChannelHandler接口下定义了很多事件处理的方法。当我们使用Netty进行网络编程时,需要自定义一个Handler类实现ChannelHandler接口或其实现类,然后重写对应方法实现业务逻辑。

  • public void channelActive(ChannelHandlerContext ctx),通道就绪事件
  • public void channelRead(ChannelHandlerContext ctx, Object msg),通道读取数据事件
  • public void channelReadComplete(ChannelHandlerContext ctx) ,数据读取完毕事件
  • public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause),通道发生异常事件

2、事件处理链表:ChannelPipeline接口

 ChannelPipeline类是ChannelHandler实例对象的链表,用于处理或截获通道的接收和发送数据。对于每个新的通道Channel,都会创建一个新的ChannelPipeline,并将pipeline附加到channel中。

3、链表中的事件结点:ChannelHandlerContext

ChannelHandlerContext是Pipeline链中的实际处理结点。ChannelPipeline并不是直接管理ChannelHandler,而是通过ChannelHandlerContext来间接管理。

每一个ChannelHandlerContext都包含一个事件处理器ChannelHandler,并且绑定对应的channel和ChannelPipeline,从而方便对ChannelHandler进行调用。

  • ChannelFuture close(),关闭通道
  • ChannelOutboundInvoker flush(),刷新
  • ChannelFuture writeAndFlush(Object msg) , 将数据写到 ChannelPipeline 中,ChannelHandler 的下一个 ChannelHandler 开始处理(出站)

4、创建初始化通道对象:ChannelInitializer类

我们可以通过重写ChannelInitializer类中的initChannel方法创建一个通道初始化对象:

 // 创建一个通道初始化对象
new ChannelInitializer<SocketChannel>() {  
      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
          // 向pipeline中添加自定义业务处理handler
          ch.pipeline().addLast(new NettyServerHandle())
      }
});

OK停一下,这时候有点乱糟糟----ChannelChannelbalabla太多了,我梳理了一下上面四个接口类在实际链路中的关系:

 一个Channel对应一个ChannelPipeline,而ChannelPipeline中包含的是ChannelHardleContext所组成的双向链表。我们暂且将这条链表分为head、body、tail三个部分。在body中,每个ChannelHardlerContext对应关联一个ChannelHardler,如果想要向整个链表中添加新的事件处理Handler,就可以重写ChannelInitialize类中的initChannel方法(代码见第4点)向链表中添加新的Channel。

再看到链表的头和尾head和tail。head和tail并没有包含ChannelHandler,是因为他俩继承了AbstractChannelHandlerContext同时实现了ChannelHandler接口,所以它们有了Context和Handler双重属性。head靠近网络端,tail靠近应用端。入站时从head入,出站时从tail出。

5、参数设置:ChannelOption

Netty在创建Channel实例后都需要设置ChannelOption参数,它是Socket的标准参数:

  • ChannelOption.SO_BACKLOG:初始化服务器可连接等待队列的大小
  • ChannelOption.SO_KEEPALIVE:一直保持TCP连接活动状态

6、获取异步处理状态:ChannelFuture接口

异步和同步相对。Netty中所有IO操作都是异步的。在异步模式下,调用者无法立刻获得处理状态。但我们可以通过 Future-Listener 机制(给Future添加监听器),主动获取IO操作结果。

我有一个任务,提交给了Future,Future替我完成这个任务。期间我自己可以去做任何想做的事情。一段时间之后,我就便可以从Future那儿取出结果。

ChannelFuture接口是Future接口的一个子接口,保存Channel异步操作的结果,addListener方法可以添加监听器。 常用方法如下:

  • Channel channel(),返回当前正在进行 IO 操作的通道
  • ChannelFuture sync(),等待异步操作执行完毕,将异步改为同步
  • addListener(),注册监听器,当操作或Future对象已完成(isDone返回ture),将会通知指定的监听器
  • isDone(),判断当前操作是否完成
  • isSuccess(),判断已完成的当前操作是否成功
  • isCancelled(),判断已完成的当前操作是否被取消
  • getCause(),获取已完成的当前操作失败的原因
  • closeFuture(),关闭通道
  • Future-Listener模式

在启动客户端和服务端时,我们可以给ChannelFuture对象添加一个监听器:调用addListener方法,然后new一个ChannelFutureListener类并重写其中的operationComplete方法。判断条件为future.isSuccess()的布尔值:

// 启动客户端, 等待连接服务端, 同时将异步改为同步
ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
/**
* Future - Listener
* */
future.addListener(new ChannelFutureListener() {
       @Override
       public void operationComplete(ChannelFuture future) throws Exception {
           if (future.isSuccess()) {
               System.out.println("数据发送成功.");
           } else {
               System.out.println("数据发送失败.");
           }
       }
});

7、创建线程组:EventLoopGroup

EventLoopGroup就是事件循环线程EventLoop的合集,可以用它创建像BossGroup和WorkerGroup这样的事件循环线程组。

EventLoopGroup提供了next接口,用于获取LoopGroup去处理任务。一般会有多个LoopGroup同时工作,且一个EvenGroup对应一个selector用于监听请求和事件。

一般Netty服务器端编程都会有两个EvenLoopGroup:Boss和Worker。

常用其实现类NioEventLoopGroup:

  • public NioEventLoopGroup(),构造方法,创建线程组
  • public Future<?> shutdownGracefully(),断开连接,关闭线程 

8、端启动助手及设置线程组:ServerBootstrap和Bootstrap

启动助手。ServerBootstrap是服务器端的,Bootstrap是客户端的,通过它俩可以完成对应端的各种配置。

首先创建一个启动助手(以服务端举例):

ServerBootstrap server = new ServerBootstrap();

服务端启动助手创建完成,接下来可以使用以下方法对此启动助手进行设置:

  • public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroupchildGroup), 服务器端设置两个EventLoop
  • public B group(EventLoopGroup group) ,客户端设置一个 EventLoop
  • public B channel(Class<? extends C> channelClass),设置一个服务器端的通道实现
  • public B option(ChannelOption参数, T value),给服务器通道添加配置
  • public ServerBootstrap childOption(ChannelOption参数, T value),给接收到的通道添加配置
  • public ServerBootstrap childHandler(ChannelHandler childHandler),设置业务处理类(自定义的 handler)
  • public ChannelFuture bind(int inetPort) ,服务器端设置占用的端口号
  • public ChannelFuture connect(String inetHost, int inetPort) ,客户端连接服务器端

使用如下,使用.操作符连续调用方法即可(还是server举例):

server.group(bossGroup, workerGroup)  // 设置线程组
         .channel(NioServerSocketChannel.class)      // 设置服务端通道实现;
         .option(ChannelOption.SO_BACKLOG, 128)      // 参数设置-设置线程队列中等待连接个数
         .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)     // 参数设置-设置活跃状态,child是设置workerGroup
         .childHandler(new ChannelInitializer<SocketChannel>() {   // 创建一个通道初始化对象
             @Override
             protected void initChannel(SocketChannel ch) throws Exception {
                 // 向pipeline中添加自定义业务处理handler
                 ch.pipeline().addLast(new NettyServerHandle());
             }
         });

9、缓冲区工具类:Unpooled类

用来操作缓冲区

  • public static ByteBuf copiedBuffer(CharSequence string, Charset charset)
  • 通过给定的数据和字符编码返回一个 ByteBuf 对象(类似于NIO中的ByteBuffer对象) 

五、实现Netty服务端和客户端

1、服务端实现:NettyServer

  1. 分别创建Boss和Worker线程组
  2. 然后创建服务端启动助手
  3. 接着对启动助手调用方法:设置线程组、设置服务端channel实现,使用参数设置线程队列中的等待连接个数并设为活跃状态,以及创建一个通道初始化对象并向pipeline中添加自定义业务处理handler
  4. 此时启动助手设置就绪。启动服务端并绑定端口,将异步设为同步
  5. 最后关闭通道,关闭连接池
/**
 * Netty服务端
 */
public class NettyServer {
    public static void main(String[] args) throws InterruptedException {
        //1.创建bossGroup线程组: 处理网络事件--连接事件 线程数默认为: 2 * 处理器线程数
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);

        //2.创建workerGroup线程组: 处理网络事件--读写事件 2 * 处理器线程数
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        //3.创建服务端启动助手
        ServerBootstrap bootstrap = new ServerBootstrap();

        //4.设置线程组
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)      //5.设置服务端通道实现;
                .option(ChannelOption.SO_BACKLOG, 128)      //6.参数设置-设置线程队列中等待连接个数
                .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)     //7.参数设置-设置活跃状态,child是设置workerGroup
                .childHandler(new ChannelInitializer<SocketChannel>() {   //8.创建一个通道初始化对象
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //9.向pipeline中添加自定义业务处理handler
                        ch.pipeline().addLast(new NettyServerHandle());
                    }
                });

        //10.启动服务端并绑定端口,同时将异步改为同步
        ChannelFuture future = bootstrap.bind(9999).sync();
        System.out.println("服务器启动成功....");
        /**
         * Future - Listener
         * */
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    System.out.println("端口绑定成功!");
                } else {
                    System.out.println("端口绑定失败!");
                }
            }
        });

        //11.关闭通道(并不是真正意义上的关闭,而是监听通道关闭状态)和关闭连接池
        future.channel().closeFuture().sync();
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

其中自定义业务处理NettyServerHandler类实现如下:

public class NettyServerHandle implements ChannelInboundHandler {
    /**
     * 通道读取事件
     *
     * @param ctx 通道上下文对象
     * @param msg 消息
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("客户端发来消息:" +
                byteBuf.toString(CharsetUtil.UTF_8));
    }
    /**
     * 读取完毕事件
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
    {
        ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是Netty服务端.",CharsetUtil.UTF_8));
    }
    /**
     * 异常发生事件
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    }
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception
    {
    }
    /**
     * 通道就绪事件
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws
            Exception {
    }
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws
            Exception {
    }@Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    }
}

2、客户端实现:NettyClient

和服务器端实现思路很像。只不过客户端只需要设置一个线程组即可。

  1. 创建线程组
  2. 创建客户端启动助手
  3. 对启动助手调用方法:设置线程组、设置服务端channel实现,使用参数设置线程队列中的等待连接个数并设为活跃状态,以及创建一个通道初始化对象并向pipeline中添加自定义业务处理handler
  4. 启动客户端,等待服务端连接,同时将异步改为同步
  5. 最后关闭通道,关闭连接池
/**
 * Netty客户端
 */
public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        //1. 创建线程组
        EventLoopGroup group = new NioEventLoopGroup();
        //2. 创建客户端启动助手
        Bootstrap bootstrap = new Bootstrap();
        //3. 设置线程组
        bootstrap.group(group)
                .channel(NioSocketChannel.class)    //4. 设置服务端通道实现为NIO
                .handler(new ChannelInitializer<SocketChannel>() {   //5. 创建一个通道初始化对象
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //6. 向pipeline中添加自定义业务处理handler
                        ch.pipeline().addLast(new NettyClientHandle());
                    }});

        //7. 启动客户端, 等待连接服务端, 同时将异步改为同步
        ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
        /**
         * Future - Listener
         * */
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    System.out.println("数据发送成功.");
                } else {
                    System.out.println("数据发送失败.");
                }
            }
        });

        //8. 关闭通道和关闭连接池
        future.channel().closeFuture().sync();
        group.shutdownGracefully();
    }
}

其中自定义业务处理NettyClientHandler类实现如下:

/**
 * 客户端处理类
 */
public class NettyClientHandle implements ChannelInboundHandler {
    /**
     * 通道就绪事件
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端", CharsetUtil.UTF_8));
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("服务端发来消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    }
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    }
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception
    {
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    }
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
    }
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    }
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    }
}

当两端达成连接时,输出为:

 

最后

以上就是拼搏金鱼为你收集整理的Netty核心原理一、Netty是什么二、Reactor模型三、Netty线程模型四、Netty核心API五、实现Netty服务端和客户端的全部内容,希望文章能够帮你解决Netty核心原理一、Netty是什么二、Reactor模型三、Netty线程模型四、Netty核心API五、实现Netty服务端和客户端所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(42)

评论列表共有 0 条评论

立即
投稿
返回
顶部