我是靠谱客的博主 坦率纸飞机,最近开发中收集的这篇文章主要介绍Netty案例(五)之应用层心跳检测netty版本心跳检测,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文章目录

  • netty版本
  • 心跳检测

netty版本

  1. netty版本:io.netty:netty-all:4.1.33.Final

心跳检测

  1. 心跳:即在TCP长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保TCP连接的有效性。

  2. 为什么需要心跳?

    • 因为网络的不可靠性, 有可能在TCP保持长连接的过程中, 由于某些突发情况, 例如网线被拔出, 突然掉电等, 会造成服务器和客户端的连接中断。在这些突发情况下, 如果恰好服务器和客户端之间没有交互的话, 那么它们是不能在短时间内发现对方已经掉线的。为了解决这个问题, 我们就需要引入心跳机制。
  3. 心跳机制的工作原理是: 在服务器和客户端之间一定时间内没有数据交互时, 即处于 idle(空闲)状态时, 客户端或服务器会发送一个特殊的数据包(一般都是自定义)给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据包回应发送方, 即一个PING-PONG交互。自然地, 当某一端收到心跳消息后, 就知道了对方仍然在线, 这就确保TCP连接的有效性。一般情况下,心跳检测到连接失效后,还需要配合重连来进行进一步的重新连接。

  4. 心跳机制

    • 使用TCP协议层的keepalive机制
    • 应用层自定义心跳机制,这里主要讲应用层实现
  5. 在Netty中, 实现心跳机制的关键是IdleStateHandlerIdleStateHandler所产生的IdleStateEvent的处理逻辑。在userEventTriggered中, 根据 IdleStateEventstate()的不同, 而进行不同的处理。

        /**
         * 心跳既可以从客户端发起也可以从服务端发起,此处从客户端发起
         */
        public class HeartbeatClient extends AbstractClient {
        
            // 读超时
            private static final int READ_IDEL_TIME_OUT = 4;
            // 写超时
            private static final int WRITE_IDEL_TIME_OUT = 5;
            // 所有超时
            private static final int ALL_IDEL_TIME_OUT = 7;
        
            public HeartbeatClient(String host, int port) {
                super(host, port);
            }
        
            @Override
            public ChannelHandler[] getChannelHandlers() {
                return new ChannelHandler[]{
                        new IdleStateHandler(READ_IDEL_TIME_OUT, WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.SECONDS),
                        new HeartbeatClientHandler()
                };
            }
        
            public static void main(String[] args) throws Throwable {
                new HeartbeatClient("127.0.0.1", 8080).start();
        
            }
        
        }
    
    
        public class HeartbeatClientHandler extends SimpleChannelInboundHandler<String> {
        
            private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatClientHandler.class);
        
            private int heartbeatCount = 0;
        
            /**
             * 丢失连接的次数,读,写,all
             */
            private AtomicInteger read_loss_connect_times = new AtomicInteger(0);
            private AtomicInteger write_loss_connect_times = new AtomicInteger(0);
            private AtomicInteger all_loss_connect_times = new AtomicInteger(0);
        
        
            @Override
            protected void channelRead0(ChannelHandlerContext context, String msg) {
                //服务端回复PONG
                if ("PONG".equals(msg)) {
                    read_loss_connect_times.getAndDecrement();
                    LOGGER.info("{} get pong msg from {}", context.channel().remoteAddress());
                }
            }
        
            protected void sendPingMsg(ChannelHandlerContext context) {
                context.writeAndFlush("PING");
                heartbeatCount++;
                LOGGER.info("sent ping msg to {} , count: ", context.channel().remoteAddress(), heartbeatCount);
            }
        
        
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                /***
                 *  IdleStateHandler 所产生的 IdleStateEvent 的处理逻辑.
                 *  在 userEventTriggered 中, 根据 IdleStateEvent 的 state() 的不同, 
                 *  而进行不同的处理. 例如如果是读取数据 idle, 则 e.state() == READER_IDLE, 
                 *  因此就调用 handleReaderIdle 来处理它.
                 */
                if (evt instanceof IdleStateEvent) {
                    IdleStateEvent e = (IdleStateEvent) evt;
                    switch (e.state()) {
                        case READER_IDLE://读空闲
                            handleReaderIdle(ctx);
                            break;
                        case WRITER_IDLE:
                            handleWriterIdle(ctx);
                            break;
                        case ALL_IDLE:
                            handleAllIdle(ctx);
                            break;
                        default:
                            break;
                    }
                } else {
                    super.userEventTriggered(ctx, evt);
                }
            }
        
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                LOGGER.info("{}  is active", ctx.channel().remoteAddress());
            }
        
            @Override
            public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                LOGGER.info("{} is inactive", ctx.channel().remoteAddress());
            }
        
            protected void handleReaderIdle(ChannelHandlerContext ctx) {
                sendPingMsg(ctx);
                LOGGER.info("读空闲,丢失连接的总次数:{}", read_loss_connect_times.getAndIncrement());
                if (read_loss_connect_times.get() > 3) {
                    LOGGER.info("已经超过{}次,关闭这个不活跃的连接", 3);
                    ctx.channel().close();
                }
            }
        
            protected void handleWriterIdle(ChannelHandlerContext ctx) {
                LOGGER.info("写空闲,丢失连接的总次数:{}", write_loss_connect_times.incrementAndGet());
            }
        
        
            protected void handleAllIdle(ChannelHandlerContext ctx) {
                LOGGER.info("所有空闲,丢失连接的总次数:{}", all_loss_connect_times.incrementAndGet());
            }
        
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                LOGGER.error(cause.getMessage(), cause);
                ctx.close();
            }
        
        
        }
        
    
    

最后

以上就是坦率纸飞机为你收集整理的Netty案例(五)之应用层心跳检测netty版本心跳检测的全部内容,希望文章能够帮你解决Netty案例(五)之应用层心跳检测netty版本心跳检测所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(45)

评论列表共有 0 条评论

立即
投稿
返回
顶部