概述
场景
物联网关服务器,需要不断采集数据并上报,主备2台服务器同时部署应用程序,但是只能一台服务器启动服务,即主服务器启动服务,备服务器停止服务,如果主服务器挂了,备服务器马上启动,并推送主服务器挂了的消息给运维修复,这样的话,就需要主备之间建立心跳监测,来实时的主备之间的服务状态。
源码
在Netty4中,有IdleStateHandler这个类,他是netty内部提供的用于检测连接有效性的,他可以对三种类型的心跳检测,当有连接空闲的时候,或者超过设定之间没有读写的时候,就会触犯一个IdleStateEvent事件,然后在自定义的handler中重写userEventTriggered这个方法,进行后续的处理。
public IdleStateHandler(
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
if (unit == null) {
throw new NullPointerException("unit");
}
if (readerIdleTime <= 0) {
readerIdleTimeNanos = 0;
} else {
readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
}
if (writerIdleTime <= 0) {
writerIdleTimeNanos = 0;
} else {
writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
}
if (allIdleTime <= 0) {
allIdleTimeNanos = 0;
} else {
allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
}
}
- readerIdleTime:读超时
- writerIdleTime:写超时
- allIdleTime:所有类型的超时
同时也有3个task的定时任务,分别是读,写,所有类型的定时任务
当IdleStateHandler被添加到pipeline中,就会调用initialize这个方法,
private void initialize(ChannelHandlerContext ctx) {
switch (state) {
case 1:
case 2:
return;
}
state = 1;
EventExecutor loop = ctx.executor();
lastReadTime = lastWriteTime = System.nanoTime();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = loop.schedule(
new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = loop.schedule(
new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = loop.schedule(
new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
当readerIdleTimeNanos ,writerIdleTimeNanos ,allIdleTimeNanos 这3个参数 > 0时,就会创建定时任务,下面我们来看看这3个定时任务的源码,由于这3个定时任务逻辑是一样的,所以只看其中的一个即可,以ReaderIdleTimeoutTask 为例
private final class ReaderIdleTimeoutTask implements Runnable {
private final ChannelHandlerContext ctx;
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
if (!ctx.channel().isOpen()) {
return;
}
long currentTime = System.nanoTime();
long lastReadTime = IdleStateHandler.this.lastReadTime;
long nextDelay = readerIdleTimeNanos - (currentTime - lastReadTime);
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout =
ctx.executor().schedule(this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
try {
IdleStateEvent event;
if (firstReaderIdleEvent) {
firstReaderIdleEvent = false;
event = IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT;
} else {
event = IdleStateEvent.READER_IDLE_STATE_EVENT;
}
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = ctx.executor().schedule(this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
这个定时器的主要意思是这样:
- 首先获取当前系统时间currentTime
- 然后获取上一次读的时间lastReadTime
- 然后用设定的读超时时间nextDelay(假如是5秒)减去上面2个数据的差值(currentTime - lastReadTime)
- 假如这个差值是6秒,那么nextDelay = -1秒,就说明5秒内没有读的连接,读超时了
- 这里会把 firstReaderIdleEvent属性为 false ,表示,下一次读取不再是第一次了,不会重复触发事件
- 然后会创建一个 IdleStateEvent 类型的写事件对象,将此对象传递给用户的 UserEventTriggered 方法。完成触发事件的操作
以上就是IdleStateHandler类中的核心源码,简单总结一下就是这个类会根据你设置的超时参数的类型和值,循环去检测channelRead和write方法多久没有被调用了,如果这个时间超过了你设置的值,那么就会触发对应的事件,read触发read的定时任务,write触发write的定时任务,all触发all的定时任务;如果超时了,则会调用userEventTriggered方法,且会告诉你超时的类型;如果没有超时,则会循环定时检测,除非你将IdleStateHandler从Pipeline中移除
实现
首先需要准备socket 服务端,和 socket客户端,然后服务端部署在主服务器上,客户端部署在备服务器上,然后建立连接,例如每5秒钟客户端就想 服务端发送心跳包,如果超过5秒钟没有收到心跳包,视为断线,然后进行3次断线重连,如果这3次断线重连 都失败了,就确认断线了,然后进行下一步处理。
核心代码-服务端
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE,ClassResolvers.weakCachingConcurrentResolver(null)));
pipeline.addLast("encoder", new ObjectEncoder());
//心跳检测
logger.info("心跳监测开始!");
pipeline.addLast(new IdleStateHandler(5,0,0, TimeUnit.SECONDS));
pipeline.addLast("handler", new ServerHeartBeatHandler());
}
首先需要把IdleStateHandler加入到pipeline中,对应的3个long型的时间变量,带上单位
然后自定义一个ServerHeartBeatHandler继承SimpleChannelInboundHandler,同时也加入到pipeline中,会重写父类中的4个方法,分别是
- channelRead0:建立连接之前调用方法
- channelActive:建立连接之后调用方法
- channelInactive:断开连接调用方法
- userEventTriggered:用户事件处理方法,就是连接中断之后应该进行哪些操作
ServerHeartBeatHandler中还写了个handshakeAuth握手鉴权的方法,处于安全考虑,可以自定义一些握手的认证,例如可以加入客户端的ip端口号验证,秘钥验证等等,只有握手认证成功之后才能建立连接。
private boolean handshakeAuth(ChannelHandlerContext ctx, Object msg){
logger.info("服务端开始验证auth");
HeartBeatAuthVo vo = (HeartBeatAuthVo) msg;
String auth = redisService.get(Contant.SOCKET_HEART_BEAT_AUTH).toString();
String ip = CustomConfiguration.nettyHeartBeatClientHost;
if(vo.getAuthKey().equals(auth) && vo.getClientHost().equals(ip)){
logger.info("服务端心跳握手成功");
ctx.writeAndFlush(Contant.AUTH_SUCCESS_KEY);
return true;
} else {
ctx.writeAndFlush(Contant.AUTH_ERROR_KEY).addListener(ChannelFutureListener.CLOSE);
logger.error("服务端心跳握手失败");
return false;
}
}
核心代码-客户端
和服务端一样,也需要把IdleStateHandler加入到pipeline中,对应的3个long型的时间变量,带上单位,然后新建NettyClientHandler类也需要加入到pipeline中
和服务端一样,重写父类中的4个方法,只不过不同的是channelInactive这个方法中,需要加入断线重连的逻辑,如果断线了,就会创建个轮询任务,定时的去重连,例如每10秒尝试重连一次
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
handshakeAuth = false;
logger.info("断线了!");
nettyClinet = new NettyClinet();
final EventLoop loop = ctx.channel().eventLoop();
loop.schedule(new Runnable() {
@Override
public void run() {
logger.info("服务端链接不上,开始重连操作...");
nettyClinet.startClient();
}
}, 10, TimeUnit.SECONDS);
ctx.fireChannelInactive();
}
然后需要在client中加入 重连监听,监听到底重连成功了没有,如果重连了3次都失败了,就放弃
,关闭客户端
public void reConnectionListener(){
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (!channelFuture.isSuccess()) {
final EventLoop loop = channelFuture.channel().eventLoop();
ScheduledFuture sf = loop.schedule(new Runnable() {
@Override
public void run() {
logger.info("服务端链接不上,开始第" + reConnectionCount + "次重连操作...");
startClient();
reConnectionCount ++;
}
}, CustomConfiguration.nettyReconnectionTime, TimeUnit.SECONDS);
if(reConnectionCount > CustomConfiguration.nettyMaxReconnectionCount){
//中断执行此任务的线程
sf.cancel(true);
//优雅的关闭
stopClient();
logger.info("重连失败,推送掉线信息!");
//这里可以进行下一步操作,例如消息推送,等等
}
}
}
});
}
最后
利用心跳监测,可以实时的了解主服务的情况,如果一旦发生问题,心跳中断了,就马上启动备用服务,实现系统的高可用。本次分享就到这里了,如果有不对的地方欢迎指出,也可以留言讨论!
最后
以上就是舒心楼房为你收集整理的基于netty实现双机热备心跳监测场景源码实现核心代码-服务端核心代码-客户端最后的全部内容,希望文章能够帮你解决基于netty实现双机热备心跳监测场景源码实现核心代码-服务端核心代码-客户端最后所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复