概述
文章目录
- 1. Netty心跳检测机制
- 2. 源码分析
系列文章:
《Netty心跳检测机制1 IdleStateHandler示例》
《Netty心跳检测机制2 IdleStateHandler原理分析》
1. Netty心跳检测机制
所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性。
在 Netty 中, 实现心跳机制的关键是 IdleStateHandler, 看下它的构造器:
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
这里解释下三个参数的含义:
readerIdleTimeSeconds
: 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的IdleStateEvent 事件.writerIdleTimeSeconds
: 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的IdleStateEvent 事件.allIdleTimeSeconds
: 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.
注:这三个参数默认的时间单位是秒。若需要指定其他时间单位,可以使用另一个构造方法:
IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)
要实现Netty服务端心跳检测机制需要在服务器端的ChannelInitializer中加入如下的代码:
pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
2. 源码分析
继承关系图:
IdleStateHandler间接继承了ChannelInboundHandlerAdapter,并且重写了channelRead方法:
红框代码其实表示该方法只是进行了透传,不做任何业务逻辑处理,让channelPipe中的下一个handler处理channelRead方法。
我们再看看channelActive方法:
这里有个initialize的方法,这是IdleStateHandler的精髓,接着探究:
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
这边会调用schedule()方法:
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
//往线程池添加一个task
return ctx.executor().schedule(task, delay, unit);
}
进而触发一个Task,ReaderIdleTimeoutTask,这个task里的run方法源码是这样的:
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) {
//[1]计算差值,判断是否超时
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
//[2]调用 ctx.fireUserEventTriggered(evt);传播事件至下一个handler
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
[1]处代码是用当前时间(ticksInNanos()方法返回当前时间)减去最后一次channelRead方法调用的时间,假如这个结果是6s,说明最后一次调用channelRead已经是6s之前的事情了,你设置的是5s,那么nextDelay则为-1,说明超时了,那么[2]处代码则会触发下一个handler的
userEventTriggered方法:
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
ctx.fireUserEventTriggered(evt);
是不是不熟悉?很简单,和ctx.fireChannelRead(msg);
类似,是传递消息至下一个handler,前者触发下一个handler中的fireUserEventTriggered()方法,后者触发下一个handler中的fireChannelRead()方法。至此,用户可以在复写的fireUserEventTriggered()内,实现自己的业务逻辑,比如关闭连接操作等待。
如果没有超时则不触发userEventTriggered方法。
参考:
《Netty学习(五)—IdleStateHandler心跳机制》
最后
以上就是隐形帆布鞋为你收集整理的【Netty4】Netty心跳检测机制2 IdleStateHandler原理分析1. Netty心跳检测机制2. 源码分析的全部内容,希望文章能够帮你解决【Netty4】Netty心跳检测机制2 IdleStateHandler原理分析1. Netty心跳检测机制2. 源码分析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复