概述
Netty的介绍
- Netty是由JBOSS提供的一个Java的开源框架。
- Netty是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络IO程序
- Netty主要针对在TCP协议下,面向Clients端的高并发应用,或者Peer-to-Peer场景下的大量数据持续传输的应用
- Netty本质是一个NIO框架,适用于服务器通讯相关的多种应用场景
- 要透彻理解Netty,需要先学习NIO
原生Java-NIO存在的问题
- NIO的类库和API繁杂,使用麻烦:需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
- 需要具备其他的额外技能:要熟悉Java多线程编程,因为NIO编程设计到Reactor模式,必须对多线程和网络编程非常熟悉,才能写出高质量的NIO程序。
- 开发工作量和难度非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等。
- JDK NIO的Bug:臭名昭著的Epoll Bug,会导致Selector空轮循,最终导致CPU100%,知道JDK1.7该问题依旧存在,没有被根本解决
Netty的优点
- 设计优雅:适用于各种传输类型的统一API阻塞和非阻塞Socket;基于灵活且可扩展的事件模型,可以清晰的分离关注点;高度可定制的线程模型-单线程,一个或多个线程池。
- 使用方便:详细记录的Javadoc,用户指南和示例:没有吧其他的依赖。
- 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
- 安全:完整的SSL/TLS和StartTLS支持
- 社区活跃、不断更新。
Reactor模式
I/O复用结合线程池,就是Reactor模式基本设计思想,如图:
说明:
- Reactor模式,通过一个或多个输入同时传递给服务器的模型(基于事件驱动)
- 服务器程序处理传入的多个请求,并将他们同步分派到响应的处理线程,因此Reactor模式也叫分发模式
- Reactor模式使用IO复用监听事件,收到事件后,分发给某个线程(进程)这点就是网络服务器高并发处理关键
Reactor模式的核心组成:
Reactor:Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO事件做出反应。
Handlers:处理程序执行IO事件要完成的实际事件。
单Reactor单线程模式
单Reactor多线程模式
方案优缺点:
优点:可以充分的利用多核cpu的处理能力
缺点:线程池中的多线程数据共享和访问比较复杂,Reactor处理所有的事件的监听和响应,在单线程中运行,高并发场景下容易出现性能瓶颈
主从Reactor多线程模式
针对前面单个Reactor多线程模型中,Reactor在单线程中运行,高并发场景下容易成为性能瓶颈,可以让Reactor在多线程中运行
方案说明:
- Reactor主线程MainReactor对象通过seclect监听连接事件,收到事件后,通过Acceptor处理连接事件
- 当Acceptor处理连接事件后,MainReactor将连接分配给SubReactor
- SubReactor将连接加入到连接队列进行监听,并创建handler进行各种事件处理
- 当有新的事件发生时,SubReactor就会调用对应的handler线程处理
- handler通过read读取数据,分发给后面的worker线程处理
- worker线程池分配独立的worker线程进行具体的业务处理,并返回结果
- handler收到响应的结果后,在通过send将结果返回给client
- Reactor主线程可以对应多个Reactor子线程,即MainReactor可以关联多个SubReactor
主从Reactor优缺点:
优点:
(1)父线程与子线程的数据交互简单职责明确,父线程只需接收新连接,子线程完成后续的业务处理
(2)Reactor主线程只需要接收新连接传给子线程,子线程无需返回数据
缺点:
编程复杂度较高
Netty模型
工作原理说明:
(1)Netty抽象出两组线程池BossGroup专门负责接收客户端的连接,WorkGroup专门负责网络的读写
(2)BossGroup和WorkerGroup类型都是NioEventLoopGroup
(3)NIOEventLoopGroup相当于一个事件循环组,这个组中含有多个时间循环,每一个时间循环是NIOEventLoop
(4)NIOEventLoop表示一个不断循环的执行处理任务的线程,每个NIOEventLoop都有一个selector,用于监听绑定在其上的socket的网络通讯
(5)NIOEventLoopGroup可以有多个线程,即可以含有多个NIOEventLoop
(6)每个Boss NIOEventLoopGroup循环执行的步骤有3步
1、 轮询accept事件
2、 处理accept事件,与client建立连接,生成NIOSocketChannel,并将其注册到某个Worker NIOEventLoop上的Seclector
3、 处理任务队列的任务,即runAllTasks
(7)每个Worker NIOEventLoop 循环执行步骤
1、 轮询read / write事件
2、 处理IO事件,在对应的NIOSocketChannel处理
3、 处理任务队列的任务,即runAllTasks
(8)每个WorkerNIOEventLoop处理业务时,会使用pipeline(管道),pipeline中包含了Channel,即通过pipeline可以获得对应的通道。
简单来看Netty这个模型中包含这样两组对象:
BossGroup(NIOEventLoopGroup)、WorkerGroup(NIOEventLoopGroup)
这两个对象中包含多个具体的NioEventLoop对象
NioEventLoop对象中包含Selector对象和Taskqueue队列
这两个对象通过Channel对象来连接,Channel对象在连接workergroup组中的对象时,与pipeline对象互相包含,并且添加了自定义的handler对象(用来具体处理消息的对象)
案例源码分析Netty模型:
我们主要通过案例来debug观察模型的具体实现,主要观察以下几个问题:
(1)我们验证模型图的上图这个部分,BossGroup和WorkerGroup本质都是NIOEventLoopGroup,其子线程数默认是cpu核数*2
如下图所示,可以通过初始化设置子线程数,也就是Group的大小,Group中是多个NIOEventLoop对象
(2)我们验证模型图的上图这个部分,看一下NioEventLoop对象的结构,每个NioEventLoop对象都有单独的selector和TaskQueue队列,同样的,bossGroup和workerGroup结构都是一样的,只不过职责不同而已,所以workerGroup中是多个NioEventLoop对象,也就增强了并发能力。
(3)WorkGroup中默认为8个线程,客户端请求在8个线程中怎么轮询分配
我们在NettyServerhandler对象的read函数中输出当前线程名来进行验证
(4)我们验证模型图的上图这个部分,我们通过handler中read函数中的ctx上下文来输出看一下Channel和pipeline这两个对象的结构。pipeline和channel是相互包含的关系。并且全部包含在ctx这个对象中。
pipeline的结构,其中可以获取到对应的channel
Channel的结构,同样可以获取到pipeline,还可以获取到客户端地址和服务端地址,以及当前所在的eventLoop对象
简单的案例代码如下:共4部分,NettyServer、NettyClient、NettyServerhandler、NettyClienthandler
NettyServer:
//NettyServer
package com.sgg.Netty.simple;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//创建BossGroup和WorkerGroup
//说明
//1、创建两个线程组BossGroup和WorkerGroup
//2、BossGroup只负责处理请求,真正和客户端的业务处理会交给WorkerGroup
//3、两个都是无限循环
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
//创建服务端的启动对象,配置启动参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式变成来进行设置
bootstrap.group(bossGroup,workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG,128) //设置线程队列的连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true) //设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道测试对象
//给pipeline设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyServerhandler()); //向这个管道增加handler处理器
}
});
System.out.println(".....服务器准备好了");
//绑定一个端口并且同步,生成一个ChannelFuture对象
ChannelFuture cf = bootstrap.bind(6668).sync();
//对关闭通道进行监听
cf.channel().closeFuture().sync();
}
}
NettyClient:
//NettyClient
package com.sgg.Netty.simple;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//客户端需要一个事件循环组
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try{
//创建客户端启动对象
//注意客户端不是ServerBootstrap,是Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClienthandler());//加入自己的handler
}
});
System.out.println("客户端OK");
//启动客户端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",6668).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
}finally {
//关闭
eventLoopGroup.shutdownGracefully();
}
}
}
NettyServerhandler
//NettyServerhandler
package com.sgg.Netty.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;
/**
* 我们自定义一个handler,相当于我们自定义了一个消息的处理对象,
* 但是在netty中,我们想实现这个handler,需要继承某个netty规定好的模板
*
* 这样我们定义的handler才可以使用,具备handler的功能
*/
public class NettyServerhandler extends ChannelInboundHandlerAdapter {
//重写读取数据的方法
/**
*
* @param ctx 上下文对象,含有管道pipeline、通道、地址
* @param msg 客户端发送的数据(object形式)
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器端线程:"+Thread.currentThread().getName());
System.out.println("server ctx = "+ctx);
System.out.println("Channel和pipeline的结构如下");
Channel channel = ctx.channel();
ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链表
//将msg转换成ByteBuf,这个buffer类是Netty提供的,并不是NIO的buffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送的消息:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端的地址是:"+ctx.channel().remoteAddress());
}
/**
* 数据读取完毕
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//将数据写到缓存并刷新到你客户端
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~",CharsetUtil.UTF_8));
}
/**
* 处理异常,一般需要关闭通道
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
NettyServerhandler
//NettyServerhandler
package com.sgg.Netty.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;
/**
* 我们自定义一个handler,相当于我们自定义了一个消息的处理对象,
* 但是在netty中,我们想实现这个handler,需要继承某个netty规定好的模板
*
* 这样我们定义的handler才可以使用,具备handler的功能
*/
public class NettyServerhandler extends ChannelInboundHandlerAdapter {
//重写读取数据的方法
/**
*
* @param ctx 上下文对象,含有管道pipeline、通道、地址
* @param msg 客户端发送的数据(object形式)
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器端线程:"+Thread.currentThread().getName());
System.out.println("server ctx = "+ctx);
System.out.println("Channel和pipeline的结构如下");
Channel channel = ctx.channel();
ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链表
//将msg转换成ByteBuf,这个buffer类是Netty提供的,并不是NIO的buffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送的消息:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端的地址是:"+ctx.channel().remoteAddress());
}
/**
* 数据读取完毕
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//将数据写到缓存并刷新到你客户端
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,客户端~",CharsetUtil.UTF_8));
}
/**
* 处理异常,一般需要关闭通道
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
NettyClienthandler
package com.sgg.Netty.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyClienthandler extends ChannelInboundHandlerAdapter {
//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client"+ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello,服务端~", CharsetUtil.UTF_8));
}
//当通道有读取事件时,会触发该方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:"+buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址是:"+ctx.channel().remoteAddress());
}
//异常处理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
Netty异步模型
Future-Listener机制:
(1)当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的ChannelFuture来获取操作的执行状态,注册监听函数来执行完成后的操作。
(2)常见有如下操作:
- 通过isDone来判断当前操作是否完成;
- 通过isSuccess来判断操作是否成功;
- 通过getCause来获取已完成的当前操作失败的原因; 通过isCancelled来判断已完成的操作是否呗取消;
- 通过addListener来注册监听器,当操作已完成,将会通知指定的监听器,如果Future对象已完成,则通知指定的监听器
//Future-Listener机制
//第一种写法
//绑定一个端口并且同步,生成一个ChannelFuture对象
ChannelFuture cf = bootstrap.bind(6668).sync();
//给cf注册监听器,监控我们关心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(cf.isSuccess()){
System.out.println("监听端口成功");
}else{
System.out.println("监听端口失败");
}
}
});
//第二种写法
//绑定一个端口并且同步,生成一个ChannelFuture对象
bootstrap.bind(6668).addListener(future->{
if(cf.isSuccess()){
System.out.println("监听端口成功");
}else{
System.out.println("监听端口失败");
}
});
(3)相比传统阻塞IO,执行IO操作后线程会被阻塞住,知道操作完成;异步处理的好处是不会造成线程阻塞,线程在IO操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量。
快速入门实例-HTTP服务
我们实现一个HTTP的服务器不需要写客户端,直接从浏览器访问即可,所以我们的服务端包含3个文件,分别是TestServer、TestServerInitializer、TesthttpServerhandler
TestServer:
package com.sgg.Netty.http;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class TestServer {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128) //设置线程队列的连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true) //设置保持活动连接状态
.childHandler(new TestServerInitializer());
System.out.println(".....服务器准备好了");
ChannelFuture cf = bootstrap.bind(8849).sync();
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(cf.isSuccess()){
System.out.println("监听端口成功");
}else{
System.out.println("监听端口失败");
}
}
});
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
TestServerInitializer
package com.sgg.Netty.http;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//向管道加入处理器
//得到管道
ChannelPipeline pipeline = socketChannel.pipeline();
//加入一个netty提供的httpServerCodec codec =>[coder - decoder]
//1、HttpServerCodec是Netty提供的处理http的编-解码器
pipeline.addLast("MyHttpServerCodec",new HttpServerCodec());
//2、增加一个自定义的handler
pipeline.addLast("MyTesthttpServerhandler",new TesthttpServerhandler());
}
}
TesthttpServerhandler
package com.sgg.Netty.http;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
/**
* 1、SimpleChannelInboundHandler是ChannelInboundHandlerAdapter模板的子类,方法更丰富
* 2、HttpObject客户端服务器端的数据呗封装成HttpObject,相当于我们自己代码中对传送数据进行R封装一个道理
*/
public class TesthttpServerhandler extends SimpleChannelInboundHandler<HttpObject> {
//channelRead0读物客户端数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
System.out.println("tag1");
//判断msg是不是httprequest
if(msg instanceof HttpRequest){
System.out.println("httpObject类型 = "+msg.getClass());
System.out.println("客户端地址"+ctx.channel().remoteAddress());
System.out.println("msg 类型:"+msg.getClass());
System.out.println("客户端地址:"+ctx.channel().remoteAddress());
// 给浏览器回复信息 [http协议]
ByteBuf content = Unpooled.copiedBuffer("hello,我是服务器...", CharsetUtil.UTF_16);
// 构造http响应,httpresponse
DefaultHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plaini");
httpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH,content.readableBytes());
// 发送构建好的 response
ctx.writeAndFlush(httpResponse);
}
}
}
Netty核心组件
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128) //设置线程队列的连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true) //设置保持活动连接状态
.childHandler(new TestServerInitializer());
最后
以上就是娇气小兔子为你收集整理的Netty-IO模型的全部内容,希望文章能够帮你解决Netty-IO模型所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复