概述
心跳定义
心跳即在TCP长连接中,客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线,以确保TCP连接的有效性。
为什么需要心跳
因为网络的不可靠性,有可能在TCP保持长连接的过程中,由于某些突发情况,例如网线被拔出,突然掉电等,会造成服务器和客户端的连接中断。在这些突发情况下,如果恰好服务器和客户端之间没有交互的话,那么它们是不能在短时间内发现对方已经掉线的.。为了解决这个问题,我们就需要引入心跳机制。心跳机制的工作原理是:在服务器和客户端之间一定时间内没有数据交互时,即处于idle状态时,客户端或服务器会发送一个特殊的数据包给对方,当接收方收到这个数据报文后,也立即发送一个特殊的数据报文,回应发送方,此时就完成了一次PING-PONG的交互。自然地,当某一端收到心跳消息后,就知道了对方仍然在线,这就确保TCP连接的有效性。
Netty中,实现心跳机制的关键是IdleStateHandler,它可以对一个channel的读/写设置定时器,当channel在一定事件间隔内没有数据交互时(即处于idle状态),就会触发指定的事件。
有关IdleStateHandler原理分析:https://www.cnblogs.com/duan2/p/8919458.html
实例演示
客户端和服务器通过TCP长连接进行通信,TCP报文格式如下:
+--------+-----+---------------+
| Length |Type | Content |
| 17 | 1 |"HELLO, WORLD" |
+--------+-----+---------------+
- 客户端每隔一个随机的时间后, 向服务器发送消息, 服务器收到消息后, 立即将收到的消息原封不动地回复给客户端。
- 若客户端在指定的时间间隔内没有读/写操作,则客户端会自动向服务器发送一个PING心跳,服务器收到PING心跳消息时,需要回复一个PONG消息。
CustomHeartbeatHandler捕获IdleStateHandler触发的IdleStateEvent事件,在userEventTriggered中接收不同类型的事件做出相应的处理,并且负责心跳的发送和接收。
public abstract class CustomHeartbeatHandler extends SimpleChannelInboundHandler<ByteBuf> {
public static final byte PING_MSG = 1;
public static final byte PONG_MSG = 2;
public static final byte DATA_MSG = 3;
protected String name;
private int heartbeatCount = 0;
public CustomHeartbeatHandler(String name) {
this.name = name;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
if (byteBuf.getByte(4) == PING_MSG) {
sendPongMsg(ctx);
} else if (byteBuf.getByte(4) == PONG_MSG) {
System.out.println(name
+ " get pong msg from " + ctx.channel().remoteAddress());
} else {
handleData(ctx, byteBuf);
}
}
protected void sendPingMsg(ChannelHandlerContext context) {
ByteBuf buf = context.alloc().buffer(Constant.PACK_BASE_LENGTH);
buf.writeInt(Constant.PACK_BASE_LENGTH); // 长度域长4 + 消息类型1
buf.writeByte(PING_MSG);
context.writeAndFlush(buf);
heartbeatCount++;
System.out.println(name
+ " sent ping msg to " + context.channel().remoteAddress()
+ " count: " + heartbeatCount
+ " current time: " + DateUtil.dateFormat());
}
private void sendPongMsg(ChannelHandlerContext context) {
ByteBuf buf = context.alloc().buffer(Constant.PACK_BASE_LENGTH);
buf.writeInt(Constant.PACK_BASE_LENGTH); // 长度域长4 + 消息类型1
buf.writeByte(PONG_MSG);
context.channel().writeAndFlush(buf);
heartbeatCount++;
System.out.println(name
+ " sent pong msg to " + context.channel().remoteAddress()
+ " count: " + heartbeatCount
+ " current time: " + DateUtil.dateFormat());
}
protected abstract void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case READER_IDLE:
handleReaderIdle(ctx);
break;
case WRITER_IDLE:
handleWriterIdle(ctx);
break;
case ALL_IDLE:
handleAllIdle(ctx);
break;
default:
break;
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("--- " + ctx.channel().remoteAddress() + " is active ---");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.err.println("--- " + ctx.channel().remoteAddress() + " is inactive ---");
}
protected void handleReaderIdle(ChannelHandlerContext ctx) {
System.err.println("--- READER_IDLE ---");
}
protected void handleWriterIdle(ChannelHandlerContext ctx) {
System.err.println("--- WRITER_IDLE ---");
}
protected void handleAllIdle(ChannelHandlerContext ctx) {
System.err.println("--- ALL_IDLE ---");
}
@Override
protected void messageReceived(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
}
}
客户端handler
接收服务端相应数据,读写空闲发送ping包,tcp断线重连。
public class ClientHandler extends CustomHeartbeatHandler {
private NettyClient client;
public ClientHandler(NettyClient client) {
super("client");
this.client = client;
}
@Override
protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
byte[] data = new byte[byteBuf.readableBytes() - Constant.PACK_BASE_LENGTH];
byteBuf.skipBytes(Constant.PACK_BASE_LENGTH);
byteBuf.readBytes(data);
System.out.println(name
+ " receive message: " + new String(data));
}
@Override
protected void handleAllIdle(ChannelHandlerContext ctx) {
super.handleAllIdle(ctx);
sendPingMsg(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
client.doConnect(); // 断开tcp连接后重连
}
}
客户端启动类
public class NettyClient {
private NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
private Channel channel;
private Bootstrap bootstrap;
public void start() {
try {
bootstrap = new Bootstrap();
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new IdleStateHandler(0, 0, 5));
p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
p.addLast(new ClientHandler(NettyClient.this));
}
});
this.doConnect();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
protected void doConnect() {
if (channel != null && channel.isActive()) {
return;
}
ChannelFuture future = bootstrap.connect("127.0.0.1", 9527);
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture futureListener) throws Exception {
if (futureListener.isSuccess()) {
channel = futureListener.channel();
System.out.println("Connect to server successfully!");
} else {
System.out.println("Failed to connect to server, try connect after 10s");
futureListener.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
doConnect();
}
}, 10, TimeUnit.SECONDS);
}
}
});
}
public void sendData() throws Exception {
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < 10000; i++) {
if (channel != null && channel.isActive()) {
String content = "client req " + i;
ByteBuf buf = channel.alloc().buffer(Constant.PACK_BASE_LENGTH + content.getBytes().length);
buf.writeInt(Constant.PACK_BASE_LENGTH + content.getBytes().length);
buf.writeByte(CustomHeartbeatHandler.DATA_MSG);
buf.writeBytes(content.getBytes());
channel.writeAndFlush(buf);
System.out.println("client send message: " + content
+ " current time: " + DateUtil.dateFormat());
}
Thread.sleep(random.nextInt(20000));
}
}
public static void main(String[] args) throws Exception {
NettyClient client = new NettyClient();
client.start();
client.sendData();
}
}
服务端handler
接收客户端消息,读空闲时断开tcp连接
public class ServerHandler extends CustomHeartbeatHandler {
public ServerHandler() {
super("server");
}
@Override
protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf buf) {
byte[] data = new byte[buf.readableBytes() - Constant.PACK_BASE_LENGTH];
ByteBuf respBuf = Unpooled.copiedBuffer(buf);
buf.skipBytes(Constant.PACK_BASE_LENGTH);
buf.readBytes(data);
System.out.println(name
+ " receive message: " + new String(data)
+ " current time: " + DateUtil.dateFormat());
channelHandlerContext.write(respBuf);
}
@Override
protected void handleReaderIdle(ChannelHandlerContext ctx) {
super.handleReaderIdle(ctx);
System.err.println("--- client " + ctx.channel().remoteAddress().toString() + " reader timeout, close it ---");
ctx.close();
}
}
服务端启动类
public class NettyServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new IdleStateHandler(10, 0, 0));
p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
p.addLast(new ServerHandler());
}
});
Channel ch = bootstrap.bind(9527).sync().channel();
ch.closeFuture().sync();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
最后
以上就是闪闪万宝路为你收集整理的Netty之心跳重连的全部内容,希望文章能够帮你解决Netty之心跳重连所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复