概述
- 基于netty的心跳检测,有需要的朋友可以参考下。
- 这两天由于要给android系统的设备写一个心跳功能,所以在这里写一个基于netty的心跳检测功能。
- 实现的功能:
- 1.客户端网络空闲5秒没有进行写操作是,进行发送一次ping心跳给服务端;
- 2.客户端如果在下一个发送ping心跳周期来临时,还没有收到服务端pong的心跳应答,则失败心跳计数器加1;
- 3.每当客户端收到服务端的pong心跳应答后,失败心跳计数器清零;
- 4.如果连续超过3次没有收到服务端的心跳回复,则断开当前连接,在5秒后进行重连操作,直到重连成功,否则每隔5秒又会进行重连;
- 5.服务端网络空闲状态到达6秒后,服务端心跳失败计数器加1;
- 6.只要收到客户端的ping消息,服务端心跳失败计数器清零;
- 7.服务端连续3次没有收到客户端的ping消息后,将关闭链路,释放资源,等待客户端重连;
- 服务端代码:
- package com.kg.netty.server;
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelPipeline;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.serialization.ClassResolvers;
- import io.netty.handler.codec.serialization.ObjectDecoder;
- import io.netty.handler.codec.serialization.ObjectEncoder;
- import io.netty.handler.timeout.IdleState;
- import io.netty.handler.timeout.IdleStateEvent;
- import io.netty.handler.timeout.IdleStateHandler;
- import java.util.concurrent.TimeUnit;
- import com.kg.netty.msg.KeepAliveMessage;
- import com.kg.utils.Constants;
- import com.kg.utils.Utils;
- public class KeepAliveServer {
- // 端口
- private int port ;
- public KeepAliveServer(int port) {
- this.port = port;
- }
- ChannelFuture f ;
- ServerBootstrap b ;
- // 设置6秒检测chanel是否接受过心跳数据
- private static final int READ_WAIT_SECONDS = 6;
- // 定义客户端没有收到服务端的pong消息的最大次数
- private static final int MAX_UN_REC_PING_TIMES = 3;
- public void startServer() {
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- b = new ServerBootstrap();
- b.group(bossGroup, workerGroup);
- b.channel(NioServerSocketChannel.class);
- b.childHandler(new KeepAliveServerInitializer());
- // 服务器绑定端口监听
- f = b.bind(port).sync();
- // 监听服务器关闭监听,此方法会阻塞
- f.channel().closeFuture().sync();
- // 可以简写为
- /* b.bind(portNumber).sync().channel().closeFuture().sync(); */
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }
- /**
- * 消息处理器
- * @author cullen edward
- */
- private class KeepAliveServerInitializer extends ChannelInitializer<SocketChannel> {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- /*
- * 使用ObjectDecoder和ObjectEncoder
- * 因为双向都有写数据和读数据,所以这里需要两个都设置
- * 如果只读,那么只需要ObjectDecoder即可
- */
- pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
- pipeline.addLast("encoder", new ObjectEncoder());
- /*
- * 这里只监听读操作
- * 可以根据需求,监听写操作和总得操作
- */
- pipeline.addLast("pong", new IdleStateHandler(READ_WAIT_SECONDS, 0, 0,TimeUnit.SECONDS));
- pipeline.addLast("handler", new Heartbeat());
- }
- }
- private class Heartbeat extends SimpleChannelInboundHandler<KeepAliveMessage> {
- // 失败计数器:未收到client端发送的ping请求
- private int unRecPingTimes = 0 ;
- // 每个chanel对应一个线程,此处用来存储对应于每个线程的一些基础数据,此处不一定要为KeepAliveMessage对象
- ThreadLocal<KeepAliveMessage> localMsgInfo = new ThreadLocal<KeepAliveMessage>();
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, KeepAliveMessage msg) throws Exception {
- System.out.println(ctx.channel().remoteAddress() + " Say : sn=" + msg.getSn()+",reqcode="+msg.getReqCode());
- // 收到ping消息后,回复
- if(Utils.notEmpty(msg.getSn())&&msg.getReqCode()==1){
- msg.setReqCode(Constants.RET_CODE);
- ctx.channel().writeAndFlush(msg);
- // 失败计数器清零
- unRecPingTimes = 0;
- if(localMsgInfo.get()==null){
- KeepAliveMessage localMsg = new KeepAliveMessage();
- localMsg.setSn(msg.getSn());
- localMsgInfo.set(localMsg);
- /*
- * 这里可以将设备号放入一个集合中进行统一管理
- */
- // TODO
- }
- }else{
- ctx.channel().close();
- }
- }
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof IdleStateEvent) {
- IdleStateEvent event = (IdleStateEvent) evt;
- if (event.state() == IdleState.READER_IDLE) {
- /*读超时*/
- System.out.println("===服务端===(READER_IDLE 读超时)");
- // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连
- if(unRecPingTimes >= MAX_UN_REC_PING_TIMES){
- System.out.println("===服务端===(读超时,关闭chanel)");
- // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连
- ctx.channel().close();
- }else{
- // 失败计数器加1
- unRecPingTimes++;
- }
- } else if (event.state() == IdleState.WRITER_IDLE) {
- /*写超时*/
- System.out.println("===服务端===(WRITER_IDLE 写超时)");
- } else if (event.state() == IdleState.ALL_IDLE) {
- /*总超时*/
- System.out.println("===服务端===(ALL_IDLE 总超时)");
- }
- }
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- System.out.println("错误原因:"+cause.getMessage());
- if(localMsgInfo.get()!=null){
- /*
- * 从管理集合中移除设备号等唯一标示,标示设备离线
- */
- // TODO
- }
- ctx.channel().close();
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("Client active ");
- super.channelActive(ctx);
- }
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- // 关闭,等待重连
- ctx.close();
- if(localMsgInfo.get()!=null){
- /*
- * 从管理集合中移除设备号等唯一标示,标示设备离线
- */
- // TODO
- }
- System.out.println("===服务端===(客户端失效)");
- }
- }
- public void stopServer(){
- if(f!=null){
- f.channel().close();
- }
- }
- /**
- * @param args
- */
- public static void main(String[] args) {
- KeepAliveServer keepAliveServer = new KeepAliveServer(1666);
- keepAliveServer.startServer();
- }
- }
- 客户端代码:
- package com.kg.netty.client;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelPipeline;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.SimpleChannelInboundHandler;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.serialization.ClassResolvers;
- import io.netty.handler.codec.serialization.ObjectDecoder;
- import io.netty.handler.codec.serialization.ObjectEncoder;
- import io.netty.handler.timeout.IdleState;
- import io.netty.handler.timeout.IdleStateEvent;
- import io.netty.handler.timeout.IdleStateHandler;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
- import com.kg.netty.msg.KeepAliveMessage;
- import com.kg.utils.Constants;
- public class KeepAliveClient {
- private String host ;
- private int port ;
- private EventLoopGroup group ;
- private Bootstrap b ;
- private Channel ch ;
- // 定义客户端没有收到服务端的pong消息的最大次数
- private static final int MAX_UN_REC_PONG_TIMES = 3;
- // 多长时间未请求后,发送心跳
- private static final int WRITE_WAIT_SECONDS = 5;
- // 隔N秒后重连
- private static final int RE_CONN_WAIT_SECONDS = 5;
- // 客户端连续N次没有收到服务端的pong消息 计数器
- private int unRecPongTimes = 0 ;
- private ScheduledExecutorService executorService ;
- // 是否停止
- private boolean isStop = false ;
- public KeepAliveClient(String host, int port) {
- this.host = host ;
- this.port = port ;
- group = new NioEventLoopGroup();
- b = new Bootstrap();
- b.group(group).channel(NioSocketChannel.class).handler(new HeartbeatInitializer());
- }
- public void start() {
- connServer();
- }
- private void connServer(){
- isStop = false;
- if(executorService!=null){
- executorService.shutdown();
- }
- executorService = Executors.newScheduledThreadPool(1);
- executorService.scheduleWithFixedDelay(new Runnable() {
- boolean isConnSucc = true;
- @Override
- public void run() {
- try {
- // 重置计数器
- unRecPongTimes = 0;
- // 连接服务端
- if(ch!=null&&ch.isOpen()){
- ch.close();
- }
- ch = b.connect(host, port).sync().channel();
- // 此方法会阻塞
- // ch.closeFuture().sync();
- System.out.println("connect server finish");
- } catch (Exception e) {
- e.printStackTrace();
- isConnSucc = false ;
- } finally{
- if(isConnSucc){
- if(executorService!=null){
- executorService.shutdown();
- }
- }
- }
- }
- }, RE_CONN_WAIT_SECONDS, RE_CONN_WAIT_SECONDS, TimeUnit.SECONDS);
- }
- public class HeartbeatInitializer extends ChannelInitializer<SocketChannel> {
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
- pipeline.addLast("encoder", new ObjectEncoder());
- pipeline.addLast("ping", new IdleStateHandler(0, WRITE_WAIT_SECONDS, 0,TimeUnit.SECONDS));
- // 客户端的逻辑
- pipeline.addLast("handler", new ClientHandler());
- }
- }
- public class ClientHandler extends SimpleChannelInboundHandler<KeepAliveMessage> {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, KeepAliveMessage msg)
- throws Exception {
- System.out.println("Server say : sn=" + msg.getSn()+",reqcode="+msg.getReqCode());
- if (Constants.RET_CODE == msg.getReqCode()) {
- // 计数器清零
- unRecPongTimes = 0;
- }
- }
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("Client active ");
- super.channelActive(ctx);
- }
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("Client close ");
- super.channelInactive(ctx);
- /*
- * 重连
- */
- if(!isStop){
- connServer();
- }
- }
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- if (evt instanceof IdleStateEvent) {
- IdleStateEvent event = (IdleStateEvent) evt;
- if (event.state() == IdleState.READER_IDLE) {
- /*读超时*/
- System.out.println("===服务端===(READER_IDLE 读超时)");
- } else if (event.state() == IdleState.WRITER_IDLE) {
- /*写超时*/
- System.out.println("===服务端===(WRITER_IDLE 写超时)");
- if(unRecPongTimes < MAX_UN_REC_PONG_TIMES){
- ctx.channel().writeAndFlush(getSrcMsg()) ;
- unRecPongTimes++;
- }else{
- ctx.channel().close();
- }
- } else if (event.state() == IdleState.ALL_IDLE) {
- /*总超时*/
- System.out.println("===服务端===(ALL_IDLE 总超时)");
- }
- }
- }
- }
- private KeepAliveMessage getSrcMsg(){
- KeepAliveMessage keepAliveMessage = new KeepAliveMessage();
- // 设备码
- keepAliveMessage.setSn("sn_123456abcdfef");
- keepAliveMessage.setReqCode(Constants.REQ_CODE);
- return keepAliveMessage ;
- }
- public void stop(){
- isStop = true;
- if(ch!=null&&ch.isOpen()){
- ch.close();
- }
- if(executorService!=null){
- executorService.shutdown();
- }
- }
- /**
- * @param args
- */
- public static void main(String[] args) {
- KeepAliveClient keepAliveServer = new KeepAliveClient("127.0.0.1",1666);
- keepAliveServer.start();
- }
- }
- 参考网站:
- http://coder.beitown.com/archives/1180
- 下载工程,请猛戳
- http://download.csdn.net/detail/asd13141718/8492741
最后
以上就是炙热乐曲为你收集整理的netty 心跳检测的全部内容,希望文章能够帮你解决netty 心跳检测所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复