我是靠谱客的博主 炙热乐曲,最近开发中收集的这篇文章主要介绍netty 心跳检测,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

  1. 基于netty的心跳检测,有需要的朋友可以参考下。  
  2. 这两天由于要给android系统的设备写一个心跳功能,所以在这里写一个基于netty的心跳检测功能。  
  3. 实现的功能:  
  4. 1.客户端网络空闲5秒没有进行写操作是,进行发送一次ping心跳给服务端;  
  5. 2.客户端如果在下一个发送ping心跳周期来临时,还没有收到服务端pong的心跳应答,则失败心跳计数器加1;  
  6. 3.每当客户端收到服务端的pong心跳应答后,失败心跳计数器清零;  
  7. 4.如果连续超过3次没有收到服务端的心跳回复,则断开当前连接,在5秒后进行重连操作,直到重连成功,否则每隔5秒又会进行重连;  
  8. 5.服务端网络空闲状态到达6秒后,服务端心跳失败计数器加1;  
  9. 6.只要收到客户端的ping消息,服务端心跳失败计数器清零;  
  10. 7.服务端连续3次没有收到客户端的ping消息后,将关闭链路,释放资源,等待客户端重连;  
  11. 服务端代码:  
  12. package com.kg.netty.server;  
  13. import io.netty.bootstrap.ServerBootstrap;  
  14. import io.netty.channel.ChannelFuture;  
  15. import io.netty.channel.ChannelHandlerContext;  
  16. import io.netty.channel.ChannelInitializer;  
  17. import io.netty.channel.ChannelPipeline;  
  18. import io.netty.channel.EventLoopGroup;  
  19. import io.netty.channel.SimpleChannelInboundHandler;  
  20. import io.netty.channel.nio.NioEventLoopGroup;  
  21. import io.netty.channel.socket.SocketChannel;  
  22. import io.netty.channel.socket.nio.NioServerSocketChannel;  
  23. import io.netty.handler.codec.serialization.ClassResolvers;  
  24. import io.netty.handler.codec.serialization.ObjectDecoder;  
  25. import io.netty.handler.codec.serialization.ObjectEncoder;  
  26. import io.netty.handler.timeout.IdleState;  
  27. import io.netty.handler.timeout.IdleStateEvent;  
  28. import io.netty.handler.timeout.IdleStateHandler;  
  29. import java.util.concurrent.TimeUnit;  
  30. import com.kg.netty.msg.KeepAliveMessage;  
  31. import com.kg.utils.Constants;  
  32. import com.kg.utils.Utils;  
  33. public class KeepAliveServer {  
  34.     // 端口  
  35.     private int port ;  
  36.     public KeepAliveServer(int port) {  
  37.         this.port = port;  
  38.     }  
  39.       
  40.     ChannelFuture f ;  
  41.       
  42.     ServerBootstrap b ;  
  43.       
  44.     // 设置6秒检测chanel是否接受过心跳数据  
  45.     private static final int READ_WAIT_SECONDS = 6;  
  46.       
  47.     // 定义客户端没有收到服务端的pong消息的最大次数  
  48.     private static final int MAX_UN_REC_PING_TIMES = 3;  
  49.     public void startServer() {  
  50.         EventLoopGroup bossGroup = new NioEventLoopGroup();  
  51.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
  52.         try {  
  53.             b = new ServerBootstrap();  
  54.             b.group(bossGroup, workerGroup);  
  55.             b.channel(NioServerSocketChannel.class);  
  56.             b.childHandler(new KeepAliveServerInitializer());  
  57.             // 服务器绑定端口监听  
  58.             f = b.bind(port).sync();  
  59.             // 监听服务器关闭监听,此方法会阻塞  
  60.             f.channel().closeFuture().sync();  
  61.             // 可以简写为  
  62.             /* b.bind(portNumber).sync().channel().closeFuture().sync(); */  
  63.         } catch (InterruptedException e) {  
  64.             e.printStackTrace();  
  65.         } finally {  
  66.             bossGroup.shutdownGracefully();  
  67.             workerGroup.shutdownGracefully();  
  68.         }  
  69.     }  
  70.     /** 
  71.      * 消息处理器 
  72.      * @author cullen edward 
  73.      */  
  74.     private class KeepAliveServerInitializer extends ChannelInitializer<SocketChannel> {  
  75.         @Override  
  76.         protected void initChannel(SocketChannel ch) throws Exception {  
  77.             ChannelPipeline pipeline = ch.pipeline();  
  78.               
  79.             /* 
  80.              * 使用ObjectDecoder和ObjectEncoder 
  81.              * 因为双向都有写数据和读数据,所以这里需要两个都设置 
  82.              * 如果只读,那么只需要ObjectDecoder即可 
  83.              */  
  84.             pipeline.addLast("decoder"new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));  
  85.             pipeline.addLast("encoder"new ObjectEncoder());  
  86.               
  87.             /* 
  88.              * 这里只监听读操作 
  89.              * 可以根据需求,监听写操作和总得操作 
  90.              */  
  91.             pipeline.addLast("pong"new IdleStateHandler(READ_WAIT_SECONDS, 00,TimeUnit.SECONDS));  
  92.               
  93.             pipeline.addLast("handler"new Heartbeat());  
  94.         }  
  95.     }  
  96.       
  97.     private class Heartbeat extends SimpleChannelInboundHandler<KeepAliveMessage> {   
  98.           
  99.         // 失败计数器:未收到client端发送的ping请求  
  100.         private int unRecPingTimes = 0 ;  
  101.           
  102.         // 每个chanel对应一个线程,此处用来存储对应于每个线程的一些基础数据,此处不一定要为KeepAliveMessage对象  
  103.         ThreadLocal<KeepAliveMessage> localMsgInfo = new ThreadLocal<KeepAliveMessage>();   
  104.           
  105.         @Override  
  106.         protected void channelRead0(ChannelHandlerContext ctx, KeepAliveMessage msg) throws Exception {  
  107.             System.out.println(ctx.channel().remoteAddress() + " Say : sn=" + msg.getSn()+",reqcode="+msg.getReqCode());  
  108.             // 收到ping消息后,回复  
  109.             if(Utils.notEmpty(msg.getSn())&&msg.getReqCode()==1){  
  110.                 msg.setReqCode(Constants.RET_CODE);  
  111.                 ctx.channel().writeAndFlush(msg);  
  112.                 // 失败计数器清零  
  113.                 unRecPingTimes = 0;  
  114.                 if(localMsgInfo.get()==null){  
  115.                     KeepAliveMessage localMsg = new KeepAliveMessage();  
  116.                     localMsg.setSn(msg.getSn());  
  117.                     localMsgInfo.set(localMsg);  
  118.                     /* 
  119.                      * 这里可以将设备号放入一个集合中进行统一管理 
  120.                      */  
  121.                     // TODO  
  122.                 }  
  123.             }else{  
  124.                 ctx.channel().close();  
  125.             }  
  126.         }  
  127.           
  128.         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
  129.             if (evt instanceof IdleStateEvent) {  
  130.                 IdleStateEvent event = (IdleStateEvent) evt;  
  131.                 if (event.state() == IdleState.READER_IDLE) {  
  132.                     /*读超时*/  
  133.                     System.out.println("===服务端===(READER_IDLE 读超时)");  
  134.                     // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连  
  135.                     if(unRecPingTimes >= MAX_UN_REC_PING_TIMES){  
  136.                         System.out.println("===服务端===(读超时,关闭chanel)");  
  137.                         // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连  
  138.                         ctx.channel().close();  
  139.                     }else{  
  140.                         // 失败计数器加1  
  141.                         unRecPingTimes++;  
  142.                     }  
  143.                 } else if (event.state() == IdleState.WRITER_IDLE) {  
  144.                     /*写超时*/     
  145.                     System.out.println("===服务端===(WRITER_IDLE 写超时)");  
  146.                 } else if (event.state() == IdleState.ALL_IDLE) {  
  147.                     /*总超时*/  
  148.                     System.out.println("===服务端===(ALL_IDLE 总超时)");  
  149.                 }  
  150.             }  
  151.         }  
  152.           
  153.         @Override  
  154.         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
  155.             System.out.println("错误原因:"+cause.getMessage());  
  156.             if(localMsgInfo.get()!=null){  
  157.                 /* 
  158.                  * 从管理集合中移除设备号等唯一标示,标示设备离线 
  159.                  */  
  160.                 // TODO  
  161.             }  
  162.             ctx.channel().close();  
  163.         }  
  164.         @Override  
  165.         public void channelActive(ChannelHandlerContext ctx) throws Exception {  
  166.             System.out.println("Client active ");  
  167.             super.channelActive(ctx);  
  168.         }  
  169.           
  170.         @Override  
  171.         public void channelInactive(ChannelHandlerContext ctx) throws Exception {  
  172.             // 关闭,等待重连  
  173.             ctx.close();  
  174.             if(localMsgInfo.get()!=null){  
  175.                 /* 
  176.                  * 从管理集合中移除设备号等唯一标示,标示设备离线 
  177.                  */  
  178.                 // TODO  
  179.             }  
  180.             System.out.println("===服务端===(客户端失效)");  
  181.         }  
  182.     }  
  183.       
  184.     public void stopServer(){  
  185.         if(f!=null){  
  186.             f.channel().close();  
  187.         }  
  188.     }  
  189.     /** 
  190.      * @param args 
  191.      */  
  192.     public static void main(String[] args) {  
  193.         KeepAliveServer keepAliveServer = new KeepAliveServer(1666);  
  194.         keepAliveServer.startServer();  
  195.     }  
  196. }  
  197. 客户端代码:  
  198. package com.kg.netty.client;  
  199. import io.netty.bootstrap.Bootstrap;  
  200. import io.netty.channel.Channel;  
  201. import io.netty.channel.ChannelHandlerContext;  
  202. import io.netty.channel.ChannelInitializer;  
  203. import io.netty.channel.ChannelPipeline;  
  204. import io.netty.channel.EventLoopGroup;  
  205. import io.netty.channel.SimpleChannelInboundHandler;  
  206. import io.netty.channel.nio.NioEventLoopGroup;  
  207. import io.netty.channel.socket.SocketChannel;  
  208. import io.netty.channel.socket.nio.NioSocketChannel;  
  209. import io.netty.handler.codec.serialization.ClassResolvers;  
  210. import io.netty.handler.codec.serialization.ObjectDecoder;  
  211. import io.netty.handler.codec.serialization.ObjectEncoder;  
  212. import io.netty.handler.timeout.IdleState;  
  213. import io.netty.handler.timeout.IdleStateEvent;  
  214. import io.netty.handler.timeout.IdleStateHandler;  
  215. import java.util.concurrent.Executors;  
  216. import java.util.concurrent.ScheduledExecutorService;  
  217. import java.util.concurrent.TimeUnit;  
  218. import com.kg.netty.msg.KeepAliveMessage;  
  219. import com.kg.utils.Constants;  
  220. public class KeepAliveClient {  
  221.     private String host ;  
  222.     private int port ;  
  223.       
  224.     private EventLoopGroup group ;  
  225.       
  226.     private Bootstrap b ;  
  227.       
  228.     private Channel ch ;  
  229.       
  230.     // 定义客户端没有收到服务端的pong消息的最大次数  
  231.     private static final int MAX_UN_REC_PONG_TIMES = 3;  
  232.       
  233.     // 多长时间未请求后,发送心跳  
  234.     private static final int WRITE_WAIT_SECONDS = 5;  
  235.       
  236.     // 隔N秒后重连  
  237.     private static final int RE_CONN_WAIT_SECONDS = 5;  
  238.       
  239.     // 客户端连续N次没有收到服务端的pong消息  计数器  
  240.     private int unRecPongTimes = 0 ;  
  241.       
  242.     private ScheduledExecutorService executorService ;  
  243.       
  244.     // 是否停止  
  245.     private boolean isStop = false ;  
  246.     public KeepAliveClient(String host, int port) {  
  247.         this.host = host ;  
  248.         this.port = port ;  
  249.         group = new NioEventLoopGroup();  
  250.         b = new Bootstrap();  
  251.         b.group(group).channel(NioSocketChannel.class).handler(new HeartbeatInitializer());  
  252.     }  
  253.     public void start() {  
  254.         connServer();  
  255.     }  
  256.       
  257.     private void connServer(){  
  258.           
  259.         isStop = false;  
  260.           
  261.         if(executorService!=null){  
  262.             executorService.shutdown();  
  263.         }  
  264.         executorService = Executors.newScheduledThreadPool(1);  
  265.         executorService.scheduleWithFixedDelay(new Runnable() {  
  266.               
  267.             boolean isConnSucc = true;  
  268.               
  269.             @Override  
  270.             public void run() {  
  271.                 try {  
  272.                     // 重置计数器  
  273.                     unRecPongTimes = 0;  
  274.                     // 连接服务端  
  275.                     if(ch!=null&&ch.isOpen()){  
  276.                         ch.close();  
  277.                     }  
  278.                     ch = b.connect(host, port).sync().channel();  
  279.                     // 此方法会阻塞  
  280. //                  ch.closeFuture().sync();  
  281.                     System.out.println("connect server finish");  
  282.                 } catch (Exception e) {  
  283.                     e.printStackTrace();  
  284.                     isConnSucc = false ;  
  285.                 } finally{  
  286.                     if(isConnSucc){  
  287.                         if(executorService!=null){  
  288.                             executorService.shutdown();  
  289.                         }  
  290.                     }  
  291.                 }  
  292.             }  
  293.         }, RE_CONN_WAIT_SECONDS, RE_CONN_WAIT_SECONDS, TimeUnit.SECONDS);  
  294.     }  
  295.       
  296.     public class HeartbeatInitializer extends ChannelInitializer<SocketChannel> {  
  297.            
  298.         @Override  
  299.         protected void initChannel(SocketChannel ch) throws Exception {  
  300.             ChannelPipeline pipeline = ch.pipeline();  
  301.        
  302.             pipeline.addLast("decoder"new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));  
  303.             pipeline.addLast("encoder"new ObjectEncoder());  
  304.        
  305.             pipeline.addLast("ping"new IdleStateHandler(0, WRITE_WAIT_SECONDS, 0,TimeUnit.SECONDS));  
  306.             // 客户端的逻辑  
  307.             pipeline.addLast("handler"new ClientHandler());  
  308.         }  
  309.     }  
  310.     public class ClientHandler extends SimpleChannelInboundHandler<KeepAliveMessage> {  
  311.            
  312.         @Override  
  313.         protected void channelRead0(ChannelHandlerContext ctx, KeepAliveMessage msg)  
  314.                 throws Exception {  
  315.             System.out.println("Server say : sn=" + msg.getSn()+",reqcode="+msg.getReqCode());  
  316.             if (Constants.RET_CODE == msg.getReqCode()) {  
  317.                 // 计数器清零  
  318.                 unRecPongTimes = 0;  
  319.             }  
  320.         }  
  321.           
  322.         @Override  
  323.         public void channelActive(ChannelHandlerContext ctx) throws Exception {  
  324.             System.out.println("Client active ");  
  325.             super.channelActive(ctx);  
  326.         }  
  327.         @Override  
  328.         public void channelInactive(ChannelHandlerContext ctx) throws Exception {  
  329.             System.out.println("Client close ");  
  330.             super.channelInactive(ctx);  
  331.             /* 
  332.              * 重连 
  333.              */  
  334.             if(!isStop){  
  335.                 connServer();  
  336.             }  
  337.         }  
  338.           
  339.         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
  340.             if (evt instanceof IdleStateEvent) {  
  341.                 IdleStateEvent event = (IdleStateEvent) evt;  
  342.                 if (event.state() == IdleState.READER_IDLE) {  
  343.                     /*读超时*/  
  344.                     System.out.println("===服务端===(READER_IDLE 读超时)");  
  345.                 } else if (event.state() == IdleState.WRITER_IDLE) {  
  346.                     /*写超时*/     
  347.                     System.out.println("===服务端===(WRITER_IDLE 写超时)");  
  348.                     if(unRecPongTimes < MAX_UN_REC_PONG_TIMES){  
  349.                         ctx.channel().writeAndFlush(getSrcMsg()) ;  
  350.                         unRecPongTimes++;  
  351.                     }else{  
  352.                         ctx.channel().close();  
  353.                     }  
  354.                 } else if (event.state() == IdleState.ALL_IDLE) {  
  355.                     /*总超时*/  
  356.                     System.out.println("===服务端===(ALL_IDLE 总超时)");  
  357.                 }  
  358.             }  
  359.         }  
  360.     }  
  361.       
  362.     private KeepAliveMessage getSrcMsg(){  
  363.         KeepAliveMessage keepAliveMessage = new KeepAliveMessage();  
  364.         // 设备码  
  365.         keepAliveMessage.setSn("sn_123456abcdfef");  
  366.         keepAliveMessage.setReqCode(Constants.REQ_CODE);  
  367.         return keepAliveMessage ;  
  368.     }  
  369.       
  370.     public void stop(){  
  371.         isStop = true;  
  372.         if(ch!=null&&ch.isOpen()){  
  373.             ch.close();  
  374.         }  
  375.         if(executorService!=null){  
  376.             executorService.shutdown();  
  377.         }  
  378.     }  
  379.       
  380.     /** 
  381.      * @param args 
  382.      */  
  383.     public static void main(String[] args) {  
  384.         KeepAliveClient keepAliveServer = new KeepAliveClient("127.0.0.1",1666);  
  385.         keepAliveServer.start();  
  386.     }  
  387. }  
  388. 参考网站:  
  389. http://coder.beitown.com/archives/1180  
  390. 下载工程,请猛戳  
  391. http://download.csdn.net/detail/asd13141718/8492741 

最后

以上就是炙热乐曲为你收集整理的netty 心跳检测的全部内容,希望文章能够帮你解决netty 心跳检测所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(46)

评论列表共有 0 条评论

立即
投稿
返回
顶部