概述
文章目录
- 1 Netty线程模型
- 1.1 传统阻塞 I/O 服务模型
- 1.2 Reactor线程模型
- 1.2.1 单 Reactor 单线程模型
- 1.2.2 单Reactor多线程
- 1.2.3 主从 Reactor 多线程
- 1.2.4 Reactor线程模型小结
- 1.3 Netty线程模型
- 1.3.1 简单版Netty模型
- 1.3.2 进阶版Netty模型
- 1.3.3 详细版Netty模型
- 2 Netty快速入门案例-TCP服务
- 2.1 服务端代码实现
- 2.2 客户端代码实现
- 3 Netty任务队列
- 3.1 用户自定义的普通任务
- 3.2 用户自定义定时任务
1 Netty线程模型
不同的线程模式,对程序的性能有很大影响,在学习Netty线程模式之前,首先需要了解下 各个线程模 式, 最后看看 Netty 线程模型有什么优越性。目前存在的线程模型有:
- 传统阻塞
I/O
服务模型 Reactor
模式Reactor
单线程;- 单
Reactor
多线程; - 主从
Reactor
多线程
1.1 传统阻塞 I/O 服务模型
- 模型特点
- 采用阻塞 IO 模式获取输入的数据
- 每个连接都需要独立的线程完成数据的输入,业务处理,数据返回
- 存在问题
- 当并发数很大,就会创建大量的线程,占用很大系统资源
- 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在 read 操作,造成线程资源浪费
1.2 Reactor线程模型
针对传统阻塞 I/O 服务模型的 2 个缺点,解决方案:
- 基于
I/O
复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理 - 基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。
IO复用结合线程池,就是Reactor模式的基本设计思想。
Reactor 模式中核心组成
Reactor
:Reactor
在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO
事件做出反应。它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人;Handlers
:处理程序执行I/O
事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor
通过调度适当的处理程序来响应I/O
事件,处理程序执行非阻塞操作。
1.2.1 单 Reactor 单线程模型
-
处理流程
- Selector是可以实现应用程序通过一个阻塞对象监听多路连接请求 ;
- Reactor 对象通过 Selector监控客户端请求事件,收到事件后通过 Dispatch 进行分发 ;
- 建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对 象处理连接完成后的后续业务处理 ;
- Handler 会完成 Read→业务处理→Send 的完整业务流程。
-
优缺点
- 优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成
- 缺点:性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。Handler在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈
- 缺点:可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
使用场景:客户端的数量有限,业务处理非常快速,比如 Redis 在业务处理的时间复杂度 O(1) 的情况
1.2.2 单Reactor多线程
- 处理流程
- Reactor 对象通过 Select 监控客户端请求事件,收到事件后,通过 Dispatch 进行分发
- 如果建立连接请求,则右 Acceptor 通过 accept 处理连接请求,然后创建一个 Handler 对象处理完成连接后的各种事件
- 如果不是连接请求,则由 Reactor 分发调用连接对应的 handler 来处理
- handler 只负责响应事件,不做具体的业务处理,通过 read 读取数据后,会分发给后面的 worker 线程池的某个线程处理业务
- worker 线程池会分配独立线程完成真正的业务,并将结果返回给 handler
- handler 收到响应后,通过 send 将结果返回给 client
- 优缺点
- 优点:可以充分的利用多核 cpu 的处理能力
- 缺点:多线程数据共享和访问比较复杂,Reactor 处理所有的事件的监听和响应,在单线程运行,在高并发场景容易出现性能瓶颈。
1.2.3 主从 Reactor 多线程
- 处理流程
Reactor
主线程 MainReactor 对象通过 select 监听连接事件,收到事件后,通过 Acceptor 处理连接事件- 当 Acceptor 处理连接事件后,MainReactor 将连接分配给 SubReactor
- subreactor 将连接加入到连接队列进行监听,并创建 handler 进行各种事件处理
- 当有新事件发生时,subreactor 就会调用对应的 handler 处理
- handler 通过 read 读取数据,分发给后面的 worker 线程处理
- worker 线程池分配独立的 worker 线程进行业务处理,并返回结果
- handler 收到响应的结果后,再通过 send 将结果返回给 client
- Reactor 主线程可以对应多个 Reactor 子线程,即 MainRecator 可以关联多个 SubReactor
1.2.4 Reactor线程模型小结
- 3中线程模型生活场景类比
- 单 Reactor 单线程,前台接待员和服务员是同一个人,全程为顾客服务
- 单 Reactor 多线程,1 个前台接待员,多个服务员,接待员只负责接待
- 主从 Reactor 多线程,多个前台接待员,多个服务生
- Reactor 模式具有如下的优点
- 响应快,不必为单个同步时间所阻塞,虽然 Reactor 本身依然是同步的
- 可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销
- 扩展性好,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源
- 复用性好,Reactor 模型本身与具体事件处理逻辑无关,具有很高的复用性
1.3 Netty线程模型
Netty 的设计主要基于主从 Reactor 多线程模式,并做了一定的改进。
1.3.1 简单版Netty模型
BossGroup
线程维护Selector
,只关注Accecpt
- 当接收到
Accept
事件,获取到对应的SocketChannel
,封装成NIOScoketChannel
并注册到Worker
线程(事件循环),并进行维护 - 当
Worker
线程监听到Selector
中通道发生自己感兴趣的事件后,就进行处理(就由handler
),注意handler
已经加入到通道
1.3.2 进阶版Netty模型
- 有两组线程池:
BossGroup
和WorkerGroup
,BossGroup
中的线程专门负责和客户端建立 连接,WorkerGroup
中的线程专门负责处理连接上的读写,BossGroup
和WorkerGroup
类型都是NioEventLoopGroup
; BossGroup
和WorkerGroup
含有多个不断循环的执行事件处理的线程,每个线程都包含一 个 Selector,用于监听注册在其上的 Channel;- 每个
BossGroup
中的线程循环执行以下三个步骤- 轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
- 处理 accept 事件,与客户端建立连接,生成一个 NioSocketChannel,并将其注册到 WorkerGroup 中某个线程上的 Selector 上
- 处理任务队列的任务,即
runAllTasks
- 每个 WorkerGroup 中的线程循环执行以下三个步骤
- 轮询
read
,write
事件 - 处理
I/O
事件,即read
,write
事件,在对应NioScocketChannel
处理 - 处理任务队列的任务,即
runAllTasks
- 轮询
- 每个
Worker
NIOEventLoop
处理业务时,会使用pipeline
,pipeline
中包含了channel
,即通过pipeline
可以获取到对应通道,管道中维护了很多的处理器
1.3.3 详细版Netty模型
- Netty 抽象出两组线程池:BossGroup 和 WorkerGroup。每个线程池中都有 NioEventLoop 线程。BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup 中的 线程专门负责处理连接上的读写。BossGroup 和 WorkerGroup 的类型都是
NioEventLoopGroup
NioEventLoopGroup
相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环就 是一个 NioEventLooNioEventLoop
表示一个不断循环的执行处理任务的线程,每个NioEventLoop
都有一个Selector
,用于监听绑定在其上的socket
的网络通讯NioEventLoopGroup
可以有多个线程,即可以含有多个NioEventLoop
- 每个
BossNioEventLoop
循环执行的步骤有3
步:- 轮询
accept
事件 - 处理
accept
事件,与client
建立连接,生成NioScocketChannel
,并将其注册到某个worker
NIOEventLoop
上的Selector
- 处理任务队列的任务,即
runAllTasks
- 轮询
- 每个
WorkerNIOEventLoop
循环执行的步骤:- 轮询
read
,write
事件 - 处理
I/O
事件,即read
,write
事件,在对应NioScocketChannel
处理 - 处理任务队列的任务,即
runAllTasks
- 轮询
- 在以上两个processSelectedKeys步骤中,会使用 Pipeline(管道),Pipeline 中引用了 Channel,即通过 Pipeline 可以获取到对应的 Channel,Pipeline 中维护了很多的处理器 (拦截处理器、过滤处理器、自定义处理器等)。
2 Netty快速入门案例-TCP服务
2.1 服务端代码实现
package com.warybee.simple;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @description Netty开发一个服务端
*/
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//1.创建bossGroup线程组: 处理网络连接事件 线程数默认为: 2 * 处理器线程数
EventLoopGroup bossGroup = new NioEventLoopGroup();
//2.创建workerGroup线程组: 处理网络事件--读写事件 2 * 处理器线程数
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//3.创建服务端启动助手
ServerBootstrap bootstrap=new ServerBootstrap();
//4.设置线程组
bootstrap.group(bossGroup,workerGroup)
//5.设置服务端通道实现
.channel(NioServerSocketChannel.class)
//6.参数设置-设置线程队列中等待连接个数
.option(ChannelOption.SO_BACKLOG,128)
//7.参数设置-设置活跃状态,child是设置workerGroup
.childOption(ChannelOption.SO_KEEPALIVE,true)
8.创建一个通道初始化对象
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//9.向pipeline中添加自定义业务处理handler
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("服务端就绪");
//10.启动服务端并绑定端口,同时将异步改为同步
ChannelFuture channelFuture = bootstrap.bind(9999).sync();
//11.关闭通道(并不是真正意义上的关闭,而是监听通道关闭状态)和关闭连接池
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
Netty服务端自定义Handler
package com.warybee.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
/**
* @description 自定义服务端handle
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取客户端消息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server ctx:"+ctx);
ByteBuf byteBuffer=(ByteBuf)msg;
Channel channel = ctx.channel();
ChannelPipeline pipeline = ctx.pipeline();
System.out.println("客户端发送的消息是:"+byteBuffer.toString(StandardCharsets.UTF_8));
System.out.println("客户端地址:"+ctx.channel().remoteAddress());
}
/**
* 数据读取完成
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//发送消息给客户端
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端",StandardCharsets.UTF_8));
}
/**
* 通道就绪
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
/**
* 异常
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
2.2 客户端代码实现
package com.warybee.simple;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @description
*/
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//创建线程组
EventLoopGroup eventExecutors = new NioEventLoopGroup();
try {
//创建客户端启动助手
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler());
}
});
//启动客户端, 等待连接服务端,
ChannelFuture cf = bootstrap.connect("127.0.0.1", 9999).sync();
//关闭通道和关闭连接池
cf.channel().closeFuture().sync();
}
finally {
eventExecutors.shutdownGracefully();
}
}
}
Netty客户端自定义Handler
package com.warybee.simple;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.StandardCharsets;
/**
* @description Netty客户端自定义Handler
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 通道就绪
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client "+ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 服务端", StandardCharsets.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf=(ByteBuf) msg;
System.out.println("服务端回复的消息:"+byteBuf.toString(StandardCharsets.UTF_8));
System.out.println("服务器地址:"+ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
3 Netty任务队列
在使用Netty的时候,我们的业务处理都是放到我们自定义的handler里面,那么如果handler里面有一些执行比较耗时的操作的话,依旧会出现线程阻塞的情况,那么怎么来处理呢?
我们可以回过头去看看Netty的模型图,里面有一块是TaskQueue,这个就是Netty提供给我们的任务队列,可以用来异步处理任务,它是和channel是一一绑定的。
3.1 用户自定义的普通任务
在定义Handler里面通过ChannelHandlerContext
获取channel
/**
* 通道就绪
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
//TODO 这里可以执行耗时任务
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端", StandardCharsets.UTF_8));
}
});
System.out.println("client "+ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 服务端", StandardCharsets.UTF_8));
}
3.2 用户自定义定时任务
任务是提交到 scheduleTaskQueue
中
/**
* 通道就绪
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//用户自定义定时任务,5秒后执行
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
//TODO 这里可以执行耗时任务
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端", CharsetUtil.UTF_8));
System.out.println("channel code=" + ctx.channel().hashCode());
} catch (Exception ex) {
System.out.println("发生异常" + ex.getMessage());
}
}
}, 5, TimeUnit.SECONDS);
}
最后
以上就是彩色八宝粥为你收集整理的Netty系列(2)快速入门Netty线程模型、Netty入门程序、Netty任务队列1 Netty线程模型2 Netty快速入门案例-TCP服务3 Netty任务队列的全部内容,希望文章能够帮你解决Netty系列(2)快速入门Netty线程模型、Netty入门程序、Netty任务队列1 Netty线程模型2 Netty快速入门案例-TCP服务3 Netty任务队列所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复