概述
目录
一、Netty是什么
二、Reactor模型
1、单Reactor单线程
2、单Reactor多线程
3、主从Reactor多线程
三、Netty线程模型
1、从简理解:Boss和Worker线程
2、进阶详细学习:Boss和Worker线程池
四、Netty核心API
1、事件处理器:ChannelHandler接口及其实现类
2、事件处理链表:ChannelPipeline接口
3、链表中的事件结点:ChannelHandlerContext
4、创建初始化通道对象:ChannelInitializer类
5、参数设置:ChannelOption
6、获取异步处理状态:ChannelFuture接口
7、创建线程组:EventLoopGroup
8、端启动助手及设置线程组:ServerBootstrap和Bootstrap
9、缓冲区工具类:Unpooled类
五、实现Netty服务端和客户端
1、服务端实现:NettyServer
2、客户端实现:NettyClient
NIO学习完,发现一个问题就是---NIO的类库和API也太繁杂了!使用起来就很麻烦,几个小demo给我搞得眼花缭乱....
这时Netty出现了。
一、Netty是什么
Netty是一个基于NIO的网络编程框架。它提供异步的、基于事件驱动的网络应用程序框
架,用以快速开发高性能、高可靠性的网络 IO 程序。
二、Reactor模型
1、单Reactor单线程
图为单线程的Reator模型(我理解的)。
2、单Reactor多线程
多线程情况下也不难,只是多了几个Hardler,而Harlder只负责响应事件(read和send)而不做具体的业务处理,它会麻烦worker线程池,找一个worker线程去处理业务,read后分发任务给线程处理,处理完返回给Hardler,然后send给client。
会突然发现这个Reactor模型好眼熟,不觉得很像NIO吗?其实两者机制很像,可以理解为Reactor模型是基于NIO产生的(可以勉强这么理解,实际上这两个东西的性质不统一)
3、主从Reactor多线程
这回就更复杂了,Reactor线程变成了两个。但其实机制和上面的多线程状态也差不多:此时Reactor主线程只负责监听客户端连接请求,和客户端建立连接之后就将连接交由Reactor子线程监听后面的 IO 事件。
- Reactor主线程对象监听客户端连接事件
- 收到事件后,通过Acceptor处理客户端连接事件
- 当Acceptor连接好后,Reactor主线程将连接分配给Reactor子线程
- 子线程将连接加入自己的连接队列监听,并创建Handler对各种事件进行处理
- 当连接上有新事情发生时,子线程调用对应的Handler处理
- Handler通过read从连接上读取请求数据,将请求发给 Worker 线程池进行业务处理
- 可怜的worker线程终于处理好业务了,返回给Handler,然后Handler用send返回给客户端
一个 Reactor主线程 可以对应多个 Reactor子线程
其实说白了就是一个接着一个丢锅都不愿意干活,最后层层传递交给最底层的worker线程干。
三、Netty线程模型
又回归到我们的Netty。前面为什么介绍了Reactor模型呢?因为Netty基于主从 Reactor 多线程模式。
1、从简理解:Boss和Worker线程
- BossGroup 线程(类似于主 Reactor)中维护 Selector,ServerSocketChannel 注册到Selector上,只关注建立连接请求事件;
- 当接收到来自客户端的连接时,通过 ServerSocketChannel.accept ()获得对应的 SocketChannel,并封装成 NioSocketChannel 注册到 WorkerGroup 线程(类似于从 Reactor)中的Selector(每个 Selector 运行在一个线程中);
- 当 WorkerGroup 线程中的 Selector 监听到自己感兴趣的事就调用 Handler 处理
2、进阶详细学习:Boss和Worker线程池
抽象出了两组线程池:BossGroup线程池中的线程负责和客户端建立连接,WorkerGroup线程池中的的线程负责处理连接上的读写。
两个线程池的类型都是NioEventLoopGroup(事件循环组)。事件循环线程池中的每个线程都配有一个selector用于监听管道Channa上l是否有连接。
BossGroup循环执行以下三个步骤:
- select:轮询连接事件(OP_ACCEPT事件)
- processSelectedKeys事件:处理连接事件,与客户端连接生成NioSocketChannal,并将其注册到WorkerGroup的selector上
- runAllTasks:再以此循环处理事件队列中的其他事件
WorkerGroup循环执行以下三个步骤:
- select:轮询注册了的NioSocketChannal的读写事件(OP_READ和OP_WRITE事件)
- processSelectorKeys事件:处理读写事件
- runAllTasks:再以此循环处理事件队列中的其他事件
四、Netty核心API
1、事件处理器:ChannelHandler接口及其实现类
ChannelHandler接口下定义了很多事件处理的方法。当我们使用Netty进行网络编程时,需要自定义一个Handler类实现ChannelHandler接口或其实现类,然后重写对应方法实现业务逻辑。
- public void channelActive(ChannelHandlerContext ctx),通道就绪事件
- public void channelRead(ChannelHandlerContext ctx, Object msg),通道读取数据事件
- public void channelReadComplete(ChannelHandlerContext ctx) ,数据读取完毕事件
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause),通道发生异常事件
2、事件处理链表:ChannelPipeline接口
ChannelPipeline类是ChannelHandler实例对象的链表,用于处理或截获通道的接收和发送数据。对于每个新的通道Channel,都会创建一个新的ChannelPipeline,并将pipeline附加到channel中。
3、链表中的事件结点:ChannelHandlerContext
ChannelHandlerContext是Pipeline链中的实际处理结点。ChannelPipeline并不是直接管理ChannelHandler,而是通过ChannelHandlerContext来间接管理。
每一个ChannelHandlerContext都包含一个事件处理器ChannelHandler,并且绑定对应的channel和ChannelPipeline,从而方便对ChannelHandler进行调用。
- ChannelFuture close(),关闭通道
- ChannelOutboundInvoker flush(),刷新
- ChannelFuture writeAndFlush(Object msg) , 将数据写到 ChannelPipeline 中,ChannelHandler 的下一个 ChannelHandler 开始处理(出站)
4、创建初始化通道对象:ChannelInitializer类
我们可以通过重写ChannelInitializer类中的initChannel方法创建一个通道初始化对象:
// 创建一个通道初始化对象
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 向pipeline中添加自定义业务处理handler
ch.pipeline().addLast(new NettyServerHandle())
}
});
OK停一下,这时候有点乱糟糟----ChannelChannelbalabla太多了,我梳理了一下上面四个接口类在实际链路中的关系:
一个Channel对应一个ChannelPipeline,而ChannelPipeline中包含的是ChannelHardleContext所组成的双向链表。我们暂且将这条链表分为head、body、tail三个部分。在body中,每个ChannelHardlerContext对应关联一个ChannelHardler,如果想要向整个链表中添加新的事件处理Handler,就可以重写ChannelInitialize类中的initChannel方法(代码见第4点)向链表中添加新的Channel。
再看到链表的头和尾head和tail。head和tail并没有包含ChannelHandler,是因为他俩继承了AbstractChannelHandlerContext同时实现了ChannelHandler接口,所以它们有了Context和Handler双重属性。head靠近网络端,tail靠近应用端。入站时从head入,出站时从tail出。
5、参数设置:ChannelOption
Netty在创建Channel实例后都需要设置ChannelOption参数,它是Socket的标准参数:
- ChannelOption.SO_BACKLOG:初始化服务器可连接等待队列的大小
- ChannelOption.SO_KEEPALIVE:一直保持TCP连接活动状态
6、获取异步处理状态:ChannelFuture接口
异步和同步相对。Netty中所有IO操作都是异步的。在异步模式下,调用者无法立刻获得处理状态。但我们可以通过 Future-Listener 机制(给Future添加监听器),主动获取IO操作结果。
我有一个任务,提交给了Future,Future替我完成这个任务。期间我自己可以去做任何想做的事情。一段时间之后,我就便可以从Future那儿取出结果。
ChannelFuture接口是Future接口的一个子接口,保存Channel异步操作的结果,addListener方法可以添加监听器。 常用方法如下:
- Channel channel(),返回当前正在进行 IO 操作的通道
- ChannelFuture sync(),等待异步操作执行完毕,将异步改为同步
- addListener(),注册监听器,当操作或Future对象已完成(isDone返回ture),将会通知指定的监听器
- isDone(),判断当前操作是否完成
- isSuccess(),判断已完成的当前操作是否成功
- isCancelled(),判断已完成的当前操作是否被取消
- getCause(),获取已完成的当前操作失败的原因
- closeFuture(),关闭通道
-
Future-Listener模式
在启动客户端和服务端时,我们可以给ChannelFuture对象添加一个监听器:调用addListener方法,然后new一个ChannelFutureListener类并重写其中的operationComplete方法。判断条件为future.isSuccess()的布尔值:
// 启动客户端, 等待连接服务端, 同时将异步改为同步
ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
/**
* Future - Listener
* */
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("数据发送成功.");
} else {
System.out.println("数据发送失败.");
}
}
});
7、创建线程组:EventLoopGroup
EventLoopGroup就是事件循环线程EventLoop的合集,可以用它创建像BossGroup和WorkerGroup这样的事件循环线程组。
EventLoopGroup提供了next接口,用于获取LoopGroup去处理任务。一般会有多个LoopGroup同时工作,且一个EvenGroup对应一个selector用于监听请求和事件。
一般Netty服务器端编程都会有两个EvenLoopGroup:Boss和Worker。
常用其实现类NioEventLoopGroup:
- public NioEventLoopGroup(),构造方法,创建线程组
- public Future<?> shutdownGracefully(),断开连接,关闭线程
8、端启动助手及设置线程组:ServerBootstrap和Bootstrap
启动助手。ServerBootstrap是服务器端的,Bootstrap是客户端的,通过它俩可以完成对应端的各种配置。
首先创建一个启动助手(以服务端举例):
ServerBootstrap server = new ServerBootstrap();
服务端启动助手创建完成,接下来可以使用以下方法对此启动助手进行设置:
- public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroupchildGroup), 服务器端设置两个EventLoop
- public B group(EventLoopGroup group) ,客户端设置一个 EventLoop
- public B channel(Class<? extends C> channelClass),设置一个服务器端的通道实现
- public B option(ChannelOption参数, T value),给服务器通道添加配置
- public ServerBootstrap childOption(ChannelOption参数, T value),给接收到的通道添加配置
- public ServerBootstrap childHandler(ChannelHandler childHandler),设置业务处理类(自定义的 handler)
- public ChannelFuture bind(int inetPort) ,服务器端设置占用的端口号
- public ChannelFuture connect(String inetHost, int inetPort) ,客户端连接服务器端
使用如下,使用.操作符连续调用方法即可(还是server举例):
server.group(bossGroup, workerGroup) // 设置线程组
.channel(NioServerSocketChannel.class) // 设置服务端通道实现;
.option(ChannelOption.SO_BACKLOG, 128) // 参数设置-设置线程队列中等待连接个数
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) // 参数设置-设置活跃状态,child是设置workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() { // 创建一个通道初始化对象
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 向pipeline中添加自定义业务处理handler
ch.pipeline().addLast(new NettyServerHandle());
}
});
9、缓冲区工具类:Unpooled类
用来操作缓冲区
- public static ByteBuf copiedBuffer(CharSequence string, Charset charset)
- 通过给定的数据和字符编码返回一个 ByteBuf 对象(类似于NIO中的ByteBuffer对象)
五、实现Netty服务端和客户端
1、服务端实现:NettyServer
- 分别创建Boss和Worker线程组
- 然后创建服务端启动助手
- 接着对启动助手调用方法:设置线程组、设置服务端channel实现,使用参数设置线程队列中的等待连接个数并设为活跃状态,以及创建一个通道初始化对象并向pipeline中添加自定义业务处理handler
- 此时启动助手设置就绪。启动服务端并绑定端口,将异步设为同步
- 最后关闭通道,关闭连接池
/**
* Netty服务端
*/
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//1.创建bossGroup线程组: 处理网络事件--连接事件 线程数默认为: 2 * 处理器线程数
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//2.创建workerGroup线程组: 处理网络事件--读写事件 2 * 处理器线程数
EventLoopGroup workerGroup = new NioEventLoopGroup();
//3.创建服务端启动助手
ServerBootstrap bootstrap = new ServerBootstrap();
//4.设置线程组
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) //5.设置服务端通道实现;
.option(ChannelOption.SO_BACKLOG, 128) //6.参数设置-设置线程队列中等待连接个数
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) //7.参数设置-设置活跃状态,child是设置workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() { //8.创建一个通道初始化对象
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//9.向pipeline中添加自定义业务处理handler
ch.pipeline().addLast(new NettyServerHandle());
}
});
//10.启动服务端并绑定端口,同时将异步改为同步
ChannelFuture future = bootstrap.bind(9999).sync();
System.out.println("服务器启动成功....");
/**
* Future - Listener
* */
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("端口绑定成功!");
} else {
System.out.println("端口绑定失败!");
}
}
});
//11.关闭通道(并不是真正意义上的关闭,而是监听通道关闭状态)和关闭连接池
future.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
其中自定义业务处理NettyServerHandler类实现如下:
public class NettyServerHandle implements ChannelInboundHandler {
/**
* 通道读取事件
*
* @param ctx 通道上下文对象
* @param msg 消息
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("客户端发来消息:" +
byteBuf.toString(CharsetUtil.UTF_8));
}
/**
* 读取完毕事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception
{
ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是Netty服务端.",CharsetUtil.UTF_8));
}
/**
* 异常发生事件
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception
{
}
/**
* 通道就绪事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws
Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws
Exception {
}@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
}
2、客户端实现:NettyClient
和服务器端实现思路很像。只不过客户端只需要设置一个线程组即可。
- 创建线程组
- 创建客户端启动助手
- 对启动助手调用方法:设置线程组、设置服务端channel实现,使用参数设置线程队列中的等待连接个数并设为活跃状态,以及创建一个通道初始化对象并向pipeline中添加自定义业务处理handler
- 启动客户端,等待服务端连接,同时将异步改为同步
- 最后关闭通道,关闭连接池
/**
* Netty客户端
*/
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//1. 创建线程组
EventLoopGroup group = new NioEventLoopGroup();
//2. 创建客户端启动助手
Bootstrap bootstrap = new Bootstrap();
//3. 设置线程组
bootstrap.group(group)
.channel(NioSocketChannel.class) //4. 设置服务端通道实现为NIO
.handler(new ChannelInitializer<SocketChannel>() { //5. 创建一个通道初始化对象
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//6. 向pipeline中添加自定义业务处理handler
ch.pipeline().addLast(new NettyClientHandle());
}});
//7. 启动客户端, 等待连接服务端, 同时将异步改为同步
ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
/**
* Future - Listener
* */
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("数据发送成功.");
} else {
System.out.println("数据发送失败.");
}
}
});
//8. 关闭通道和关闭连接池
future.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
其中自定义业务处理NettyClientHandler类实现如下:
/**
* 客户端处理类
*/
public class NettyClientHandle implements ChannelInboundHandler {
/**
* 通道就绪事件
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("服务端发来消息:" + byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception
{
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
}
当两端达成连接时,输出为:
最后
以上就是拼搏金鱼为你收集整理的Netty核心原理一、Netty是什么二、Reactor模型三、Netty线程模型四、Netty核心API五、实现Netty服务端和客户端的全部内容,希望文章能够帮你解决Netty核心原理一、Netty是什么二、Reactor模型三、Netty线程模型四、Netty核心API五、实现Netty服务端和客户端所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复