我是靠谱客的博主 无辜溪流,这篇文章主要介绍Netty4实现心跳检测及IdleStateHandler源码分析,现在分享给大家,希望可以做个参考。

1.什么是心跳检测?

    判断对方是否正常运行,一般采用定时发送简单的通讯包,如果在指定时间内未接收到对方响应,则判定对方已经宕掉。用于检测TCP的异常断开。

    心跳包一般就是客户端发送给服务端的简单消息,如果服务端几分钟内没有收到客户端消息,则视为客户端已经断开,这个时候就主动关闭客户端的通道。

 

2.使用Netty实现服务端心跳检测

    下面我们编写服务端代码,服务端实现以下功能:如果在N长时间内没有接受到客户端连接,则发送一段信息给客户端,并关闭其通道

 

    * 我们创建服务端心跳检测的Handler,命名为HeartBeatHandler,并重写userEventTriggered方法

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/** * 服务端心跳检测 */ public class HeartBeatHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent){ IdleStateEvent ise = (IdleStateEvent)evt; // 服务端读空闲 if(ise.state().equals(IdleState.READER_IDLE)){ ctx.writeAndFlush("client reader idle, channel will close"); ctx.channel().close(); // 服务端写空闲 }else if (ise.state().equals(IdleState.WRITER_IDLE)){ ctx.write("pong message"); // 服务端读写空闲 }else if (ise.state().equals(IdleState.ALL_IDLE)){ ctx.channel().close(); }else{ // DO NOTHING } } } }

    * 服务端代码,如下

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class Server { public static void main(String[] args) { //服务类 ServerBootstrap bootstrap = new ServerBootstrap(); //boss和worker EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { //设置线程池 bootstrap.group(boss, worker); //设置socket工厂 bootstrap.channel(NioServerSocketChannel.class); //设置管道工厂 bootstrap.childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new StringEncoder()); // 主要在这里,先创建一个IdleStateHandler,10秒内没有读写则判定为空闲 ch.pipeline().addLast(new IdleStateHandler(10, 10, 10, TimeUnit.SECONDS)); // 然后执行心跳检测 ch.pipeline().addLast(new HeartBeatHandler()); } }); //设置参数,TCP参数 bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//serverSocketchannel的设置,链接缓冲池的大小 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的设置,维持链接的活跃,清除死链接 bootstrap.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的设置,关闭延迟发送 //绑定端口 ChannelFuture future = bootstrap.bind(8088).sync(); System.out.println("server start..."); //等待服务端关闭 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally{ //释放资源 boss.shutdownGracefully(); worker.shutdownGracefully(); } } }

    * 启动服务端并测试

    启动服务端之后,我们来开启一个客户端,还是使用telnet的方式

    连接到服务端之后,如果我们十秒钟不发送数据的话,则会被强制关闭连接,如下图所示

 

3.关于IdleStateHandler源码分析

 

    1)IdleStateHandler结构分析

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class IdleStateHandler extends ChannelDuplexHandler { // 对象初始化的时候初始化好这些数据 private final long readerIdleTimeNanos; private final long writerIdleTimeNanos; private final long allIdleTimeNanos; // 通过构造函数可知 public IdleStateHandler( long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { if (unit == null) { throw new NullPointerException("unit"); } if (readerIdleTime <= 0) { readerIdleTimeNanos = 0; } else { readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS); } if (writerIdleTime <= 0) { writerIdleTimeNanos = 0; } else { writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS); } if (allIdleTime <= 0) { allIdleTimeNanos = 0; } else { allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS); } } // 在channelRegistered/channelActive的时候调用初始化方法initialize方法 @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { // Initialize early if channel is active already. if (ctx.channel().isActive()) { initialize(ctx); } super.channelRegistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // This method will be invoked only if this handler was added // before channelActive() event is fired. If a user adds this handler // after the channelActive() event, initialize() will be called by beforeAdd(). initialize(ctx); super.channelActive(ctx); } // 监听到客户端写数据的时候,更新lastReadTime @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { lastReadTime = System.nanoTime(); firstReaderIdleEvent = firstAllIdleEvent = true; ctx.fireChannelRead(msg); }

    总结:通过以上对IdleStateHandler的分析,可知:

    * 在构造IdleStateHandler的时候,便使用创建参数确定读、写、读写的空闲时间,并转换为millsecond

    * 当客户端连接到服务端,并且其channel处于活动状态,则调用initialize初始化方法

    * 当服务端监听到客户端发送数据时,即channelRead方法,则更新lastReadTime参数,即最近一次读时间

 

    可知,最重要的逻辑应该再initialize方法中,下面我们就来分析一下该方法

 

    2)initialize()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private void initialize(ChannelHandlerContext ctx) { // Avoid the case where destroy() is called before scheduling timeouts. // See: https://github.com/netty/netty/issues/143 // private volatile int state; // 0 - none, 1 - initialized, 2 - destroyed switch (state) { case 1: case 2: return; } state = 1; // 1.获取线程池 EventExecutor loop = ctx.executor(); // 2.初始化最后一次读时间和最后一次写时间为当前时间 lastReadTime = lastWriteTime = System.nanoTime(); // 3.根据用户设置的读空闲时间启动一个定时任务,读空闲时间为频率执行 if (readerIdleTimeNanos > 0) { readerIdleTimeout = loop.schedule( new ReaderIdleTimeoutTask(ctx), readerIdleTimeNanos, TimeUnit.NANOSECONDS); } // 4.根据用户设置的写空闲时间启动一个定时任务,写空闲时间为频率执行 if (writerIdleTimeNanos > 0) { writerIdleTimeout = loop.schedule( new WriterIdleTimeoutTask(ctx), writerIdleTimeNanos, TimeUnit.NANOSECONDS); } // 5.根据用户设置的读写空闲时间启动一个定时任务,读写空闲时间为频率执行 if (allIdleTimeNanos > 0) { allIdleTimeout = loop.schedule( new AllIdleTimeoutTask(ctx), allIdleTimeNanos, TimeUnit.NANOSECONDS); } }

    总结:由上可知,实现心跳检测的关键就在这个schedule上,根据用户设置的空闲时间来确定定时任务的频率。

    那么,定时任务是如何做到检测的呢?我们继续来看

 

    3)ReaderIdleTimeoutTask

    源码如下:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private final class ReaderIdleTimeoutTask implements Runnable { private final ChannelHandlerContext ctx; ReaderIdleTimeoutTask(ChannelHandlerContext ctx) { this.ctx = ctx; } @Override public void run() { if (!ctx.channel().isOpen()) { return; } // 1.获取当前时间和最后一次读时间 long currentTime = System.nanoTime(); long lastReadTime = IdleStateHandler.this.lastReadTime; // nextDelay即为比较用户设置的读空闲时间和 当前时间-最后一次时间 long nextDelay = readerIdleTimeNanos - (currentTime - lastReadTime); // 2.如果nextDelay<=0,则说明客户端已经在超过读空闲时间内没有写入数据了 if (nextDelay <= 0) { // 2.1 重新定义readerIdleTimeout schedule,与initialize方法设置的相同,继续执行定时任务 readerIdleTimeout = ctx.executor().schedule(this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); try { // 2.2 event = new IdleStateEvent(IdleState.READER_IDLE, true),将event设置为读空闲 IdleStateEvent event; if (firstReaderIdleEvent) { firstReaderIdleEvent = false; event = IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT; } else { event = IdleStateEvent.READER_IDLE_STATE_EVENT; } // 2.3 channelIdle的主要工作就是将evt传输给下一个Handler channelIdle(ctx, event); } catch (Throwable t) { ctx.fireExceptionCaught(t); } // 3.如果nextDelay>0,则说明客户端在规定时间内已经写入数据了, } else { // 3.1 重新定义readerIdleTimeout schedule,以nextDelay为执行频率 readerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS); } } }

    由以上源码分析可知,ReaderIdleTimeoutTask的主要功能就是:

    * 根据当前时间和用户最后一次读时间来确实用户是否写入超时

    * 如果已经写超时,则重新定义schedule任务,还是以readerIdleTimeNanos为频率来执行。并设置evt为READ_IDLE,并传递给下一个Handler

    * 如果没有写超时,则继续执行schedule任务,此时以nextDelay(readerIdleTimeNanos - (currentTime - lastReadTime))为频率来执行

    

    4)后续处理

    READ_IDLE事件传递给下一个Handler,在本例中即为传递给HeartBeatHandler,其重新定义了userEventTriggered方法,此时正好接收到READ_IDLE事件,做相应处理即可

 

 

参考:Netty in Action

 

 

最后

以上就是无辜溪流最近收集整理的关于Netty4实现心跳检测及IdleStateHandler源码分析的全部内容,更多相关Netty4实现心跳检测及IdleStateHandler源码分析内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(76)

评论列表共有 0 条评论

立即
投稿
返回
顶部