概述
https://blog.csdn.net/haohaoxuexiyai/article/details/116497444
一、NettyClient
/**
* 实现了重连的客户端
*/
public class NettyClient {
private String host;
private int port;
private Bootstrap bootstrap;
private EventLoopGroup group;
public static void main(String[] args) throws Exception {
NettyClient nettyClient = new NettyClient("localhost", 9000);
nettyClient.connect();
}
public NettyClient(String host, int port) {
this.host = host;
this.port = port;
init();
}
private void init() {
//客户端需要一个事件循环组
group = new NioEventLoopGroup();
//创建客户端启动对象
// bootstrap 可重用, 只需在NettyClient实例化的时候初始化即可.
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//加入处理器
ch.pipeline().addLast(new NettyClientHandler(NettyClient.this));
}
});
}
public void connect() throws Exception {
System.out.println("netty client start。。");
//启动客户端去连接服务器端
ChannelFuture cf = bootstrap.connect(host, port);
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
//重连交给后端线程执行
future.channel().eventLoop().schedule(() -> {
System.err.println("重连服务端...");
try {
connect();
} catch (Exception e) {
e.printStackTrace();
}
}, 3000, TimeUnit.MILLISECONDS);
} else {
System.out.println("服务端连接成功...");
}
}
});
//对通道关闭进行监听
cf.channel().closeFuture().sync();
}
}
二、NettyClientHandler
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private NettyClient nettyClient;
public NettyClientHandler(NettyClient nettyClient) {
this.nettyClient = nettyClient;
}
/**
* 当客户端连接服务器完成就会触发该方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}
//当通道有读取事件时会触发,即服务端发送数据给客户端
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务端的地址: " + ctx.channel().remoteAddress());
}
// channel 处于不活动状态时调用
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.err.println("运行中断开重连。。。");
nettyClient.connect();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
三、NettyServer
public class NettyServer {
public static void main(String[] args) throws Exception {
// 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
// 创建服务器端的启动对象
ServerBootstrap bootstrap = new ServerBootstrap();
// 使用链式编程来配置参数
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
// 使用NioServerSocketChannel作为服务器的通道实现
.channel(NioServerSocketChannel.class)
// 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
// 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//对workerGroup的SocketChannel设置处理器
ch.pipeline().addLast(new LifeCycleInBoundHandler());
ch.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty server start。。");
// 绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
// 启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
ChannelFuture cf = bootstrap.bind(9000).sync();
// 给cf注册监听器,监听我们关心的事件
/*cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口9000成功");
} else {
System.out.println("监听端口9000失败");
}
}
});*/
// 等待服务端监听端口关闭,closeFuture是异步操作
// 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
cf.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
四、NettyServerHandler
/**
* 自定义Handler需要继承netty规定好的某个HandlerAdapter(规范)
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取客户端发送的数据
*
* @param ctx 上下文对象, 含有通道channel,管道pipeline
* @param msg 就是客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程 " + Thread.currentThread().getName());
//Channel channel = ctx.channel();
//ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
//将 msg 转成一个 ByteBuf,类似NIO 的 ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
}
/**
* 数据读取完毕处理方法
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.copiedBuffer("HelloClient".getBytes(CharsetUtil.UTF_8));
ctx.writeAndFlush(buf);
}
/**
* 处理异常, 一般是需要关闭通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
五、LifeCycleInBoundHandler
/**
* handler的生命周期回调接口调用顺序:
* handlerAdded -> channelRegistered -> channelActive -> channelRead -> channelReadComplete
* -> channelInactive -> channelUnRegistered -> handlerRemoved
*
* handlerAdded: 新建立的连接会按照初始化策略,把handler添加到该channel的pipeline里面,也就是channel.pipeline.addLast(new LifeCycleInBoundHandler)执行完成后的回调;
* channelRegistered: 当该连接分配到具体的worker线程后,该回调会被调用。
* channelActive:channel的准备工作已经完成,所有的pipeline添加完成,并分配到具体的线上上,说明该channel准备就绪,可以使用了。
* channelRead:客户端向服务端发来数据,每次都会回调此方法,表示有数据可读;
* channelReadComplete:服务端每次读完一次完整的数据之后,回调该方法,表示数据读取完毕;
* channelInactive:当连接断开时,该回调会被调用,说明这时候底层的TCP连接已经被断开了。
* channelUnRegistered: 对应channelRegistered,当连接关闭后,释放绑定的workder线程;
* handlerRemoved: 对应handlerAdded,将handler从该channel的pipeline移除后的回调方法。
*/
public class LifeCycleInBoundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx)
throws Exception {
System.out.println("channelRegistered: channel注册到NioEventLoop");
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx)
throws Exception {
System.out.println("channelUnregistered: channel取消和NioEventLoop的绑定");
super.channelUnregistered(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx)
throws Exception {
System.out.println("channelActive: channel准备就绪");
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx)
throws Exception {
System.out.println("channelInactive: channel被关闭");
super.channelInactive(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("channelRead: channel中有可读的数据" );
super.channelRead(ctx, msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
throws Exception {
System.out.println("channelReadComplete: channel读数据完成");
super.channelReadComplete(ctx);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx)
throws Exception {
System.out.println("handlerAdded: handler被添加到channel的pipeline");
super.handlerAdded(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx)
throws Exception {
System.out.println("handlerRemoved: handler从channel的pipeline中移除");
super.handlerRemoved(ctx);
}
}
最后
以上就是包容衬衫为你收集整理的用netty实现客户端断线重连的全部内容,希望文章能够帮你解决用netty实现客户端断线重连所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复