我是靠谱客的博主 坦率纸飞机,这篇文章主要介绍Netty案例(五)之应用层心跳检测netty版本心跳检测,现在分享给大家,希望可以做个参考。

文章目录

  • netty版本
  • 心跳检测

netty版本

  1. netty版本:io.netty:netty-all:4.1.33.Final

心跳检测

  1. 心跳:即在TCP长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保TCP连接的有效性。

  2. 为什么需要心跳?

    • 因为网络的不可靠性, 有可能在TCP保持长连接的过程中, 由于某些突发情况, 例如网线被拔出, 突然掉电等, 会造成服务器和客户端的连接中断。在这些突发情况下, 如果恰好服务器和客户端之间没有交互的话, 那么它们是不能在短时间内发现对方已经掉线的。为了解决这个问题, 我们就需要引入心跳机制。
  3. 心跳机制的工作原理是: 在服务器和客户端之间一定时间内没有数据交互时, 即处于 idle(空闲)状态时, 客户端或服务器会发送一个特殊的数据包(一般都是自定义)给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据包回应发送方, 即一个PING-PONG交互。自然地, 当某一端收到心跳消息后, 就知道了对方仍然在线, 这就确保TCP连接的有效性。一般情况下,心跳检测到连接失效后,还需要配合重连来进行进一步的重新连接。

  4. 心跳机制

    • 使用TCP协议层的keepalive机制
    • 应用层自定义心跳机制,这里主要讲应用层实现
  5. 在Netty中, 实现心跳机制的关键是IdleStateHandlerIdleStateHandler所产生的IdleStateEvent的处理逻辑。在userEventTriggered中, 根据 IdleStateEventstate()的不同, 而进行不同的处理。

    复制代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    /** * 心跳既可以从客户端发起也可以从服务端发起,此处从客户端发起 */ public class HeartbeatClient extends AbstractClient { // 读超时 private static final int READ_IDEL_TIME_OUT = 4; // 写超时 private static final int WRITE_IDEL_TIME_OUT = 5; // 所有超时 private static final int ALL_IDEL_TIME_OUT = 7; public HeartbeatClient(String host, int port) { super(host, port); } @Override public ChannelHandler[] getChannelHandlers() { return new ChannelHandler[]{ new IdleStateHandler(READ_IDEL_TIME_OUT, WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.SECONDS), new HeartbeatClientHandler() }; } public static void main(String[] args) throws Throwable { new HeartbeatClient("127.0.0.1", 8080).start(); } } public class HeartbeatClientHandler extends SimpleChannelInboundHandler<String> { private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatClientHandler.class); private int heartbeatCount = 0; /** * 丢失连接的次数,读,写,all */ private AtomicInteger read_loss_connect_times = new AtomicInteger(0); private AtomicInteger write_loss_connect_times = new AtomicInteger(0); private AtomicInteger all_loss_connect_times = new AtomicInteger(0); @Override protected void channelRead0(ChannelHandlerContext context, String msg) { //服务端回复PONG if ("PONG".equals(msg)) { read_loss_connect_times.getAndDecrement(); LOGGER.info("{} get pong msg from {}", context.channel().remoteAddress()); } } protected void sendPingMsg(ChannelHandlerContext context) { context.writeAndFlush("PING"); heartbeatCount++; LOGGER.info("sent ping msg to {} , count: ", context.channel().remoteAddress(), heartbeatCount); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { /*** * IdleStateHandler 所产生的 IdleStateEvent 的处理逻辑. * 在 userEventTriggered 中, 根据 IdleStateEvent 的 state() 的不同, * 而进行不同的处理. 例如如果是读取数据 idle, 则 e.state() == READER_IDLE, * 因此就调用 handleReaderIdle 来处理它. */ if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; switch (e.state()) { case READER_IDLE://读空闲 handleReaderIdle(ctx); break; case WRITER_IDLE: handleWriterIdle(ctx); break; case ALL_IDLE: handleAllIdle(ctx); break; default: break; } } else { super.userEventTriggered(ctx, evt); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { LOGGER.info("{} is active", ctx.channel().remoteAddress()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { LOGGER.info("{} is inactive", ctx.channel().remoteAddress()); } protected void handleReaderIdle(ChannelHandlerContext ctx) { sendPingMsg(ctx); LOGGER.info("读空闲,丢失连接的总次数:{}", read_loss_connect_times.getAndIncrement()); if (read_loss_connect_times.get() > 3) { LOGGER.info("已经超过{}次,关闭这个不活跃的连接", 3); ctx.channel().close(); } } protected void handleWriterIdle(ChannelHandlerContext ctx) { LOGGER.info("写空闲,丢失连接的总次数:{}", write_loss_connect_times.incrementAndGet()); } protected void handleAllIdle(ChannelHandlerContext ctx) { LOGGER.info("所有空闲,丢失连接的总次数:{}", all_loss_connect_times.incrementAndGet()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { LOGGER.error(cause.getMessage(), cause); ctx.close(); } }

最后

以上就是坦率纸飞机最近收集整理的关于Netty案例(五)之应用层心跳检测netty版本心跳检测的全部内容,更多相关Netty案例(五)之应用层心跳检测netty版本心跳检测内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部