概述
1.理解netty的decode处理过程,2.netty的Bytebuf在decode时的处理过程,两个指针
netty是tcp包–>decode整理报文–>业务handler的过程。decode如果list.add()但是没有读取报文,报DecoderException说你decode但是没有读取任何报文,这是netty为了防止自己开发decode出现bug
ByteBuf.readBytes(0)会返回一个空bytebuf导致
list.add(ByteBuf.readBytes(0))
报错decode() did not read anything but decoded a message
要理解netty的Bytebuf,有两个指针readerIndex,writerIndex来记录你的decode的读与netty的报文写入,可以看看这个
一起学Netty(五)之 初识ByteBuf和ByteBuf的常用API
decode中list.add(msg)把你decode整理的一个完整报文放入list给业务handler处理。由于异步,会先调用多次decode,然后是多次的业务handler(MyHandler), MyHandler中的channelRead的Object msg就是你out.add()加入的一个报文
如果报文中有其他冗余的信息,可以在decode中设置ByteBuf的readerIndex去略过这个信息,或者可以设置一个waste的 byte数组,让ByteBuf.reads…(waste),但是waste不加入list,就是空读,只读不用,来清除冗余信息,也可以在list.add()前,加入一些信息到报文中
Test.java 提供模拟tcp接口数据的源,可以自己定义byte[]或者保存的tcp接口的报文写入的文件
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.*;
public class Test
{
public static void main(String[] args) throws Exception
{
ServerSocket serverSocket = new ServerSocket(10086);
while (true){
Socket socket = serverSocket.accept();
System.out.println("accept");
// byte[] resp = ("a@$@bbbbb@$@cccccccccc@$@ddddddddddddddddddddddddddddddddddddddd@$@33333333333@$@4 v@$@").getBytes();
// InputStream inputStream = new FileInputStream(new File("D:/proj/netty/tcpradio.dat"));
InputStream inputStream = new FileInputStream(new File("tcpradio.dat"));
OutputStream outputStream = socket.getOutputStream();
byte[] readBuf = new byte[10240];
int readLen;
readLen= inputStream.read(readBuf,0,10240);
System.out.println(readLen);
outputStream.write(readBuf,0,readLen);
inputStream.close();
outputStream.close();
}
}
}
TcpInterface.java netty的启动类
package com.tgram.api2Mq;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
public class TcpInterface {
public void connect(int port,String host) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyDecodeHandler());
ch.pipeline().addLast(new MyHandler());
}
});
ChannelFuture f = b.connect(host,port).sync();
f.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
TcpInterface tcpInterface = new TcpInterface();
tcpInterface.connect(10086,"127.0.0.1");
}
}
MyDecodeHandler.java 自定义解码拼装成一个完整报文(就是我们定义的一个完整的信息报文,不是tcp的底层一个报文,tcp的报文是分片,是一串字节流)
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class MyDecodeHandler extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
//取到tag与报文长度后再处理,各2字节
if(byteBuf.readableBytes()<4){
return;
}
//记录当前ByteBuf的读指针位置,以便下面取报文长度字节
//pos是一个完整报文的开始位置,报文整体会在ByteBuf中移动,类似内存管理,所以基于字节的判断报文长度等等,都是基于pos,否则可以在byteBuf.readBytes()之后加,byteBuf.discardReadBytes();整理ByteBuf,使pos回到0开始位置
int pos = byteBuf.readerIndex();
int msgLen = ((byteBuf.getByte(pos +3) &0xFF)<<8) | (byteBuf.getByte(pos+2) &0xFF);
//收到的报文长度不足一个完整的报文,继续接收
if(byteBuf.readableBytes()<msgLen){
return;
}
//提出完整报文(readBytes读到msg中),放到list给下一个Handler处理
if(msgLen>0){
list.add(msg);
}
}
}
MyHandler.java 正式处理解码后的报文
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
public class MyHandler extends ChannelInboundHandlerAdapter
{
private int count = 0;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
ByteBuf byteBuf = (ByteBuf) msg;
// 判断报文长度
int bufLen = byteBuf.readableBytes();
//当前处理的是第几个字节开始的报文
System.out.println("pos:" + count);
//统计已处理的所有报文字节长度
count += bufLen;
System.out.println("msg:" + bufLen);
//业务的报文处理
//必须释放,如果继承simplechannelinboundhandler会自动释放,但是报文处理写在channelRead0
ReferenceCountUtil.release(msg)
}
}
最后
以上就是懵懂菠萝为你收集整理的netty解析自定义长度的tcp报文--java处理tcp接口数据的全部内容,希望文章能够帮你解决netty解析自定义长度的tcp报文--java处理tcp接口数据所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复