我是靠谱客的博主 隐形帆布鞋,最近开发中收集的这篇文章主要介绍【Netty4】Netty心跳检测机制2 IdleStateHandler原理分析1. Netty心跳检测机制2. 源码分析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文章目录

  • 1. Netty心跳检测机制
  • 2. 源码分析

系列文章:
《Netty心跳检测机制1 IdleStateHandler示例》
《Netty心跳检测机制2 IdleStateHandler原理分析》

1. Netty心跳检测机制

所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性。

在 Netty 中, 实现心跳机制的关键是 IdleStateHandler, 看下它的构造器:

 public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
   this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}

这里解释下三个参数的含义:

  • readerIdleTimeSeconds: 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的IdleStateEvent 事件.
  • writerIdleTimeSeconds: 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的IdleStateEvent 事件.
  • allIdleTimeSeconds: 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.

注:这三个参数默认的时间单位是秒。若需要指定其他时间单位,可以使用另一个构造方法:

IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)

要实现Netty服务端心跳检测机制需要在服务器端的ChannelInitializer中加入如下的代码:

pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));

2. 源码分析

继承关系图:
在这里插入图片描述
IdleStateHandler间接继承了ChannelInboundHandlerAdapter,并且重写了channelRead方法:
在这里插入图片描述

红框代码其实表示该方法只是进行了透传,不做任何业务逻辑处理,让channelPipe中的下一个handler处理channelRead方法。

我们再看看channelActive方法:
在这里插入图片描述
这里有个initialize的方法,这是IdleStateHandler的精髓,接着探究:

    private void initialize(ChannelHandlerContext ctx) {
        // Avoid the case where destroy() is called before scheduling timeouts.
        // See: https://github.com/netty/netty/issues/143
        switch (state) {
        case 1:
        case 2:
            return;
        }

        state = 1;
        initOutputChanged(ctx);

        lastReadTime = lastWriteTime = ticksInNanos();
        if (readerIdleTimeNanos > 0) {
            readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                    readerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (writerIdleTimeNanos > 0) {
            writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                    writerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (allIdleTimeNanos > 0) {
            allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                    allIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
    }

这边会调用schedule()方法:

    ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
        //往线程池添加一个task
        return ctx.executor().schedule(task, delay, unit);
    }

进而触发一个Task,ReaderIdleTimeoutTask,这个task里的run方法源码是这样的:

    private final class ReaderIdleTimeoutTask extends AbstractIdleTask {

        @Override
        protected void run(ChannelHandlerContext ctx) {
            long nextDelay = readerIdleTimeNanos;
            if (!reading) {
              //[1]计算差值,判断是否超时
                nextDelay -= ticksInNanos() - lastReadTime;
            }

            if (nextDelay <= 0) {
                // Reader is idle - set a new timeout and notify the callback.
                readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

                boolean first = firstReaderIdleEvent;
                firstReaderIdleEvent = false;

                try {
                    IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                    //[2]调用 ctx.fireUserEventTriggered(evt);传播事件至下一个handler
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                // Read occurred before the timeout - set a new timeout with shorter delay.
                readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }

[1]处代码是用当前时间(ticksInNanos()方法返回当前时间)减去最后一次channelRead方法调用的时间,假如这个结果是6s,说明最后一次调用channelRead已经是6s之前的事情了,你设置的是5s,那么nextDelay则为-1,说明超时了,那么[2]处代码则会触发下一个handler的
userEventTriggered方法:

    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

ctx.fireUserEventTriggered(evt);是不是不熟悉?很简单,和ctx.fireChannelRead(msg);类似,是传递消息至下一个handler,前者触发下一个handler中的fireUserEventTriggered()方法,后者触发下一个handler中的fireChannelRead()方法。至此,用户可以在复写的fireUserEventTriggered()内,实现自己的业务逻辑,比如关闭连接操作等待。

如果没有超时则不触发userEventTriggered方法。

参考:
《Netty学习(五)—IdleStateHandler心跳机制》

最后

以上就是隐形帆布鞋为你收集整理的【Netty4】Netty心跳检测机制2 IdleStateHandler原理分析1. Netty心跳检测机制2. 源码分析的全部内容,希望文章能够帮你解决【Netty4】Netty心跳检测机制2 IdleStateHandler原理分析1. Netty心跳检测机制2. 源码分析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(43)

评论列表共有 0 条评论

立即
投稿
返回
顶部