我是靠谱客的博主 繁荣自行车,这篇文章主要介绍基于netty的心跳检测,现在分享给大家,希望可以做个参考。

这两天由于要给android系统的设备写一个心跳功能,所以在这里写一个基于netty的心跳检测功能。

实现的功能:

1.客户端网络空闲5秒没有进行写操作是,进行发送一次ping心跳给服务端;

2.客户端如果在下一个发送ping心跳周期来临时,还没有收到服务端pong的心跳应答,则失败心跳计数器加1;

3.每当客户端收到服务端的pong心跳应答后,失败心跳计数器清零;

4.如果连续超过3次没有收到服务端的心跳回复,则断开当前连接,在5秒后进行重连操作,直到重连成功,否则每隔5秒又会进行重连;

5.服务端网络空闲状态到达6秒后,服务端心跳失败计数器加1;

6.只要收到客户端的ping消息,服务端心跳失败计数器清零;

7.服务端连续3次没有收到客户端的ping消息后,将关闭链路,释放资源,等待客户端重连;


服务端代码:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package com.kg.netty.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.TimeUnit; import com.kg.netty.msg.KeepAliveMessage; import com.kg.utils.Constants; import com.kg.utils.Utils; public class KeepAliveServer { // 端口 private int port ; public KeepAliveServer(int port) { this.port = port; } ChannelFuture f ; ServerBootstrap b ; // 设置6秒检测chanel是否接受过心跳数据 private static final int READ_WAIT_SECONDS = 6; // 定义客户端没有收到服务端的pong消息的最大次数 private static final int MAX_UN_REC_PING_TIMES = 3; public void startServer() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { b = new ServerBootstrap(); b.group(bossGroup, workerGroup); b.channel(NioServerSocketChannel.class); b.childHandler(new KeepAliveServerInitializer()); // 服务器绑定端口监听 f = b.bind(port).sync(); // 监听服务器关闭监听,此方法会阻塞 f.channel().closeFuture().sync(); // 可以简写为 /* b.bind(portNumber).sync().channel().closeFuture().sync(); */ } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } /** * 消息处理器 * @author cullen edward */ private class KeepAliveServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); /* * 使用ObjectDecoder和ObjectEncoder * 因为双向都有写数据和读数据,所以这里需要两个都设置 * 如果只读,那么只需要ObjectDecoder即可 */ pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader()))); pipeline.addLast("encoder", new ObjectEncoder()); /* * 这里只监听读操作 * 可以根据需求,监听写操作和总得操作 */ pipeline.addLast("pong", new IdleStateHandler(READ_WAIT_SECONDS, 0, 0,TimeUnit.SECONDS)); pipeline.addLast("handler", new Heartbeat()); } } private class Heartbeat extends SimpleChannelInboundHandler<KeepAliveMessage> { // 失败计数器:未收到client端发送的ping请求 private int unRecPingTimes = 0 ; // 每个chanel对应一个线程,此处用来存储对应于每个线程的一些基础数据,此处不一定要为KeepAliveMessage对象 ThreadLocal<KeepAliveMessage> localMsgInfo = new ThreadLocal<KeepAliveMessage>(); @Override protected void channelRead0(ChannelHandlerContext ctx, KeepAliveMessage msg) throws Exception { System.out.println(ctx.channel().remoteAddress() + " Say : sn=" + msg.getSn()+",reqcode="+msg.getReqCode()); // 收到ping消息后,回复 if(Utils.notEmpty(msg.getSn())&&msg.getReqCode()==1){ msg.setReqCode(Constants.RET_CODE); ctx.channel().writeAndFlush(msg); // 失败计数器清零 unRecPingTimes = 0; if(localMsgInfo.get()==null){ KeepAliveMessage localMsg = new KeepAliveMessage(); localMsg.setSn(msg.getSn()); localMsgInfo.set(localMsg); /* * 这里可以将设备号放入一个集合中进行统一管理 */ // TODO } }else{ ctx.channel().close(); } } public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { /*读超时*/ System.out.println("===服务端===(READER_IDLE 读超时)"); // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连 if(unRecPingTimes >= MAX_UN_REC_PING_TIMES){ System.out.println("===服务端===(读超时,关闭chanel)"); // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连 ctx.channel().close(); }else{ // 失败计数器加1 unRecPingTimes++; } } else if (event.state() == IdleState.WRITER_IDLE) { /*写超时*/ System.out.println("===服务端===(WRITER_IDLE 写超时)"); } else if (event.state() == IdleState.ALL_IDLE) { /*总超时*/ System.out.println("===服务端===(ALL_IDLE 总超时)"); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("错误原因:"+cause.getMessage()); if(localMsgInfo.get()!=null){ /* * 从管理集合中移除设备号等唯一标示,标示设备离线 */ // TODO } ctx.channel().close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Client active "); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 关闭,等待重连 ctx.close(); if(localMsgInfo.get()!=null){ /* * 从管理集合中移除设备号等唯一标示,标示设备离线 */ // TODO } System.out.println("===服务端===(客户端失效)"); } } public void stopServer(){ if(f!=null){ f.channel().close(); } } /** * @param args */ public static void main(String[] args) { KeepAliveServer keepAliveServer = new KeepAliveServer(1666); keepAliveServer.startServer(); } }



客户端代码:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package com.kg.netty.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import com.kg.netty.msg.KeepAliveMessage; import com.kg.utils.Constants; public class KeepAliveClient { private String host ; private int port ; private EventLoopGroup group ; private Bootstrap b ; private Channel ch ; // 定义客户端没有收到服务端的pong消息的最大次数 private static final int MAX_UN_REC_PONG_TIMES = 3; // 多长时间未请求后,发送心跳 private static final int WRITE_WAIT_SECONDS = 5; // 隔N秒后重连 private static final int RE_CONN_WAIT_SECONDS = 5; // 客户端连续N次没有收到服务端的pong消息 计数器 private int unRecPongTimes = 0 ; private ScheduledExecutorService executorService ; // 是否停止 private boolean isStop = false ; public KeepAliveClient(String host, int port) { this.host = host ; this.port = port ; group = new NioEventLoopGroup(); b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).handler(new HeartbeatInitializer()); } public void start() { connServer(); } private void connServer(){ isStop = false; if(executorService!=null){ executorService.shutdown(); } executorService = Executors.newScheduledThreadPool(1); executorService.scheduleWithFixedDelay(new Runnable() { boolean isConnSucc = true; @Override public void run() { try { // 重置计数器 unRecPongTimes = 0; // 连接服务端 if(ch!=null&&ch.isOpen()){ ch.close(); } ch = b.connect(host, port).sync().channel(); // 此方法会阻塞 // ch.closeFuture().sync(); System.out.println("connect server finish"); } catch (Exception e) { e.printStackTrace(); isConnSucc = false ; } finally{ if(isConnSucc){ if(executorService!=null){ executorService.shutdown(); } } } } }, RE_CONN_WAIT_SECONDS, RE_CONN_WAIT_SECONDS, TimeUnit.SECONDS); } public class HeartbeatInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader()))); pipeline.addLast("encoder", new ObjectEncoder()); pipeline.addLast("ping", new IdleStateHandler(0, WRITE_WAIT_SECONDS, 0,TimeUnit.SECONDS)); // 客户端的逻辑 pipeline.addLast("handler", new ClientHandler()); } } public class ClientHandler extends SimpleChannelInboundHandler<KeepAliveMessage> { @Override protected void channelRead0(ChannelHandlerContext ctx, KeepAliveMessage msg) throws Exception { System.out.println("Server say : sn=" + msg.getSn()+",reqcode="+msg.getReqCode()); if (Constants.RET_CODE == msg.getReqCode()) { // 计数器清零 unRecPongTimes = 0; } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Client active "); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("Client close "); super.channelInactive(ctx); /* * 重连 */ if(!isStop){ connServer(); } } public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { /*读超时*/ System.out.println("===服务端===(READER_IDLE 读超时)"); } else if (event.state() == IdleState.WRITER_IDLE) { /*写超时*/ System.out.println("===服务端===(WRITER_IDLE 写超时)"); if(unRecPongTimes < MAX_UN_REC_PONG_TIMES){ ctx.channel().writeAndFlush(getSrcMsg()) ; unRecPongTimes++; }else{ ctx.channel().close(); } } else if (event.state() == IdleState.ALL_IDLE) { /*总超时*/ System.out.println("===服务端===(ALL_IDLE 总超时)"); } } } } private KeepAliveMessage getSrcMsg(){ KeepAliveMessage keepAliveMessage = new KeepAliveMessage(); // 设备码 keepAliveMessage.setSn("sn_123456abcdfef"); keepAliveMessage.setReqCode(Constants.REQ_CODE); return keepAliveMessage ; } public void stop(){ isStop = true; if(ch!=null&&ch.isOpen()){ ch.close(); } if(executorService!=null){ executorService.shutdown(); } } /** * @param args */ public static void main(String[] args) { KeepAliveClient keepAliveServer = new KeepAliveClient("127.0.0.1",1666); keepAliveServer.start(); } }

参考网站:

http://coder.beitown.com/archives/1180


下载工程,请猛戳

http://download.csdn.net/detail/asd13141718/8492741

最后

以上就是繁荣自行车最近收集整理的关于基于netty的心跳检测的全部内容,更多相关基于netty内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(66)

评论列表共有 0 条评论

立即
投稿
返回
顶部