概述
1.什么是心跳检测?
判断对方是否正常运行,一般采用定时发送简单的通讯包,如果在指定时间内未接收到对方响应,则判定对方已经宕掉。用于检测TCP的异常断开。
心跳包一般就是客户端发送给服务端的简单消息,如果服务端几分钟内没有收到客户端消息,则视为客户端已经断开,这个时候就主动关闭客户端的通道。
2.使用Netty实现服务端心跳检测
下面我们编写服务端代码,服务端实现以下功能:如果在N长时间内没有接受到客户端连接,则发送一段信息给客户端,并关闭其通道
* 我们创建服务端心跳检测的Handler,命名为HeartBeatHandler,并重写userEventTriggered方法
/**
* 服务端心跳检测
*/
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
IdleStateEvent ise = (IdleStateEvent)evt;
// 服务端读空闲
if(ise.state().equals(IdleState.READER_IDLE)){
ctx.writeAndFlush("client reader idle, channel will close");
ctx.channel().close();
// 服务端写空闲
}else if (ise.state().equals(IdleState.WRITER_IDLE)){
ctx.write("pong message");
// 服务端读写空闲
}else if (ise.state().equals(IdleState.ALL_IDLE)){
ctx.channel().close();
}else{
// DO NOTHING
}
}
}
}
* 服务端代码,如下
public class Server {
public static void main(String[] args) {
//服务类
ServerBootstrap bootstrap = new ServerBootstrap();
//boss和worker
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
//设置线程池
bootstrap.group(boss, worker);
//设置socket工厂
bootstrap.channel(NioServerSocketChannel.class);
//设置管道工厂
bootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
// 主要在这里,先创建一个IdleStateHandler,10秒内没有读写则判定为空闲
ch.pipeline().addLast(new IdleStateHandler(10, 10, 10, TimeUnit.SECONDS));
// 然后执行心跳检测
ch.pipeline().addLast(new HeartBeatHandler());
}
});
//设置参数,TCP参数
bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//serverSocketchannel的设置,链接缓冲池的大小
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的设置,维持链接的活跃,清除死链接
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的设置,关闭延迟发送
//绑定端口
ChannelFuture future = bootstrap.bind(8088).sync();
System.out.println("server start...");
//等待服务端关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally{
//释放资源
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
* 启动服务端并测试
启动服务端之后,我们来开启一个客户端,还是使用telnet的方式
连接到服务端之后,如果我们十秒钟不发送数据的话,则会被强制关闭连接,如下图所示
3.关于IdleStateHandler源码分析
1)IdleStateHandler结构分析
public class IdleStateHandler extends ChannelDuplexHandler {
// 对象初始化的时候初始化好这些数据
private final long readerIdleTimeNanos;
private final long writerIdleTimeNanos;
private final long allIdleTimeNanos;
// 通过构造函数可知
public IdleStateHandler(
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
if (unit == null) {
throw new NullPointerException("unit");
}
if (readerIdleTime <= 0) {
readerIdleTimeNanos = 0;
} else {
readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
}
if (writerIdleTime <= 0) {
writerIdleTimeNanos = 0;
} else {
writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
}
if (allIdleTime <= 0) {
allIdleTimeNanos = 0;
} else {
allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
}
}
// 在channelRegistered/channelActive的时候调用初始化方法initialize方法
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// Initialize early if channel is active already.
if (ctx.channel().isActive()) {
initialize(ctx);
}
super.channelRegistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// This method will be invoked only if this handler was added
// before channelActive() event is fired. If a user adds this handler
// after the channelActive() event, initialize() will be called by beforeAdd().
initialize(ctx);
super.channelActive(ctx);
}
// 监听到客户端写数据的时候,更新lastReadTime
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
lastReadTime = System.nanoTime();
firstReaderIdleEvent = firstAllIdleEvent = true;
ctx.fireChannelRead(msg);
}
总结:通过以上对IdleStateHandler的分析,可知:
* 在构造IdleStateHandler的时候,便使用创建参数确定读、写、读写的空闲时间,并转换为millsecond
* 当客户端连接到服务端,并且其channel处于活动状态,则调用initialize初始化方法
* 当服务端监听到客户端发送数据时,即channelRead方法,则更新lastReadTime参数,即最近一次读时间
可知,最重要的逻辑应该再initialize方法中,下面我们就来分析一下该方法
2)initialize()
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
// private volatile int state; // 0 - none, 1 - initialized, 2 - destroyed
switch (state) {
case 1:
case 2:
return;
}
state = 1;
// 1.获取线程池
EventExecutor loop = ctx.executor();
// 2.初始化最后一次读时间和最后一次写时间为当前时间
lastReadTime = lastWriteTime = System.nanoTime();
// 3.根据用户设置的读空闲时间启动一个定时任务,读空闲时间为频率执行
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = loop.schedule(
new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
// 4.根据用户设置的写空闲时间启动一个定时任务,写空闲时间为频率执行
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = loop.schedule(
new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
// 5.根据用户设置的读写空闲时间启动一个定时任务,读写空闲时间为频率执行
if (allIdleTimeNanos > 0) {
allIdleTimeout = loop.schedule(
new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
总结:由上可知,实现心跳检测的关键就在这个schedule上,根据用户设置的空闲时间来确定定时任务的频率。
那么,定时任务是如何做到检测的呢?我们继续来看
3)ReaderIdleTimeoutTask
源码如下:
private final class ReaderIdleTimeoutTask implements Runnable {
private final ChannelHandlerContext ctx;
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
if (!ctx.channel().isOpen()) {
return;
}
// 1.获取当前时间和最后一次读时间
long currentTime = System.nanoTime();
long lastReadTime = IdleStateHandler.this.lastReadTime;
// nextDelay即为比较用户设置的读空闲时间和 当前时间-最后一次时间
long nextDelay = readerIdleTimeNanos - (currentTime - lastReadTime);
// 2.如果nextDelay<=0,则说明客户端已经在超过读空闲时间内没有写入数据了
if (nextDelay <= 0) {
// 2.1 重新定义readerIdleTimeout schedule,与initialize方法设置的相同,继续执行定时任务
readerIdleTimeout =
ctx.executor().schedule(this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
try {
// 2.2 event = new IdleStateEvent(IdleState.READER_IDLE, true),将event设置为读空闲
IdleStateEvent event;
if (firstReaderIdleEvent) {
firstReaderIdleEvent = false;
event = IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT;
} else {
event = IdleStateEvent.READER_IDLE_STATE_EVENT;
}
// 2.3 channelIdle的主要工作就是将evt传输给下一个Handler
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
// 3.如果nextDelay>0,则说明客户端在规定时间内已经写入数据了,
} else {
// 3.1 重新定义readerIdleTimeout schedule,以nextDelay为执行频率
readerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
由以上源码分析可知,ReaderIdleTimeoutTask的主要功能就是:
* 根据当前时间和用户最后一次读时间来确实用户是否写入超时
* 如果已经写超时,则重新定义schedule任务,还是以readerIdleTimeNanos为频率来执行。并设置evt为READ_IDLE,并传递给下一个Handler
* 如果没有写超时,则继续执行schedule任务,此时以nextDelay(readerIdleTimeNanos - (currentTime - lastReadTime))为频率来执行
4)后续处理
READ_IDLE事件传递给下一个Handler,在本例中即为传递给HeartBeatHandler,其重新定义了userEventTriggered方法,此时正好接收到READ_IDLE事件,做相应处理即可
参考:Netty in Action
最后
以上就是无辜溪流为你收集整理的Netty4实现心跳检测及IdleStateHandler源码分析的全部内容,希望文章能够帮你解决Netty4实现心跳检测及IdleStateHandler源码分析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复