概述
在使用netty 实现tcp时,出现数据大量的粘包和半包的情况,在需要准确的数据下做以下处理
private Bootstrap createBootstrap(Bootstrap bootstrap, EventLoopGroup eventLoop) {
if (bootstrap != null) {
final MyInboundHandler handler = new MyInboundHandler(this);
bootstrap.group(eventLoop);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
//解决粘包和半包问题 接收数据全部要以/r/n分割
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(handler);
socketChannel.pipeline().addLast(new OutBoundHandler());
}
}).remoteAddress(ip, port);
bootstrap.connect().addListener(new ConnectionListener(this));
}
return bootstrap;
}
这里是利用netty自带的编译规则
//解决粘包和半包问题 接收数据全部要以/r/n分割
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
socketChannel.pipeline().addLast(new StringDecoder());
注意这里实现原理是利用n 或者rn 分割数据,将上一次的数据若不全和下一次组合在一起成为完整数据,若发送的数据没有特殊符号则在接收里面无法接收到数据,最后附上TCP完整使用:
public synchronized void start() {
createBootstrap(new Bootstrap(), loop);
}
private Bootstrap createBootstrap(Bootstrap bootstrap, EventLoopGroup eventLoop) {
if (bootstrap != null) {
final MyInboundHandler handler = new MyInboundHandler(this);
bootstrap.group(eventLoop);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
//解决粘包和半包问题 接收数据全部要以/r/n分割
socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(handler);
socketChannel.pipeline().addLast(new OutBoundHandler());
}
}).remoteAddress(ip, port);
bootstrap.connect().addListener(new ConnectionListener(this));
}
return bootstrap;
}
/**
* 向服务器发送消息
*
* @param msg
* @throws Exception
*/
public boolean sendMsg(Object msg) throws Exception {
byte[] bytes = msg.toString().getBytes();
if (channel == null) {
return false;
}
if (!channel.isActive()) {
return false;
}
channel.writeAndFlush(bytes);
return true;
}
/**
* 发送消息给服务器
* @param msg 消息
*/
public boolean sendMsg(byte[] msg) {
if (channel == null) {
return false;
}
if (!channel.isActive()) {
return false;
}
channel.writeAndFlush(msg);
return true;
}
@AllArgsConstructor
class MyInboundHandler extends ChannelInboundHandlerAdapter {
private MessageClient messageClient;
/**
* 断开连接时的回调 断线重连
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("Message channel inactive:" + ctx.channel().remoteAddress());
channel = null;
final EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(() -> {
messageClient.createBootstrap(new Bootstrap(), eventLoop);
}, 2L, TimeUnit.SECONDS);
super.channelInactive(ctx);
}
/**
* 建立连接时的回调
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("Message channel active:" + ctx.channel().remoteAddress());
channel = ctx.channel();
} catch (Exception e) {
e.printStackTrace();
}
super.channelActive(ctx);
}
/**
* 接收到数据时调用
* @param ctx
* @param msg
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String msg1 = (String) msg;
}
@AllArgsConstructor
class ConnectionListener implements ChannelFutureListener {
private MessageClient messageClient;
@Override
public void operationComplete(ChannelFuture channelFuture) {
if (!channelFuture.isSuccess()) {
log.info("restart");
final EventLoop loop = channelFuture.channel().eventLoop();
loop.schedule(() -> {
messageClient.createBootstrap(new Bootstrap(), loop);
}, 2L, TimeUnit.SECONDS);
} else {
log.info("message server started! port:" + port);
}
}
}
private class OutBoundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg instanceof byte[]) {
byte[] bytesWrite = (byte[])msg;
System.out.println(new String(bytesWrite));
ByteBuf buf = ctx.alloc().buffer(bytesWrite.length);
buf.writeBytes(bytesWrite);
ctx.writeAndFlush(buf).addListener((ChannelFutureListener) channelFuture -> System.out.println("send option success!"));
}
}
}
最后
以上就是发嗲荷花为你收集整理的JAVA Netty TCP 粘包/半包解决 以及遇到的问题(实现断线重连)的全部内容,希望文章能够帮你解决JAVA Netty TCP 粘包/半包解决 以及遇到的问题(实现断线重连)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复