概述
资源下载:https://download.csdn.net/download/qq_39246466/15491561
1.目录格式
定义常量
package com.bfxy.netty.protocol;
/**
* @author Lilinfeng
* @date 2014年3月15日
* @version 1.0
*/
public final class NettyConstant {
public static final String REMOTEIP = "127.0.0.1";//远程
public static final int PORT = 8080;//远程端口
public static final int LOCAL_PORT = 12088;//本地端口
public static final String LOCALIP = "127.0.0.1";//本地
}
定义消息类型
package com.bfxy.netty.protocol;
/**
* @author Lilinfeng
* @date 2014年3月15日
* @version 1.0
*/
public enum MessageType {
SERVICE_REQ((byte) 0), // 业务请求消息
SERVICE_RESP((byte) 1), // 业务相应消息
ONE_WAY((byte) 2), // 业务ONE WAY 既是请求又是响应消息
LOGIN_REQ((byte) 3), // 握手请求消息
LOGIN_RESP((byte) 4), // 握手响应消息
HEARTBEAT_REQ((byte) 5), // 心跳请求消息
HEARTBEAT_RESP((byte) 6); // 心跳响应消息
private byte value;
private MessageType(byte value) {
this.value = value;
}
public byte value() {
return this.value;
}
}
实体类定义
package com.bfxy.netty.struct;
/**
* @author lilinfeng
* @date 2014年3月14日
* @version 1.0
*/
public final class NettyMessage {
private Header header;
private Object body;
/**
* @return the header
*/
public final Header getHeader() {
return header;
}
/**
* @param header
* the header to set
*/
public final void setHeader(Header header) {
this.header = header;
}
/**
* @return the body
*/
public final Object getBody() {
return body;
}
/**
* @param body
* the body to set
*/
public final void setBody(Object body) {
this.body = body;
}
/*
* (non-Javadoc)
*
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return "NettyMessage [header=" + header + "]";
}
}
消息头部:自定义协议解析需要遵循的规则
package com.bfxy.netty.struct;
import java.util.HashMap;
import java.util.Map;
/**
* @author Lilinfeng
* @date 2014年3月14日
* @version 1.0
*/
public final class Header {
private int crcCode = 0xabef0101; //crc16: 4
private int length;// 消息长度 4
private long sessionID;// 会话ID 8
private byte type;// 消息类型 1
private byte priority;// 消息优先级 1
private Map<String, Object> attachment = new HashMap<String, Object>(); // 附件
/**
* @return the crcCode
*/
public final int getCrcCode() {
return crcCode;
}
/**
* @param crcCode
* the crcCode to set
*/
public final void setCrcCode(int crcCode) {
this.crcCode = crcCode;
}
/**
* @return the length
*/
public final int getLength() {
return length;
}
/**
* @param length
* the length to set
*/
public final void setLength(int length) {
this.length = length;
}
/**
* @return the sessionID
*/
public final long getSessionID() {
return sessionID;
}
/**
* @param sessionID
* the sessionID to set
*/
public final void setSessionID(long sessionID) {
this.sessionID = sessionID;
}
/**
* @return the type
*/
public final byte getType() {
return type;
}
/**
* @param type
* the type to set
*/
public final void setType(byte type) {
this.type = type;
}
/**
* @return the priority
*/
public final byte getPriority() {
return priority;
}
/**
* @param priority
* the priority to set
*/
public final void setPriority(byte priority) {
this.priority = priority;
}
/**
* @return the attachment
*/
public final Map<String, Object> getAttachment() {
return attachment;
}
/**
* @param attachment
* the attachment to set
*/
public final void setAttachment(Map<String, Object> attachment) {
this.attachment = attachment;
}
/*
* (non-Javadoc)
*
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
return "Header [crcCode=" + crcCode + ", length=" + length
+ ", sessionID=" + sessionID + ", type=" + type + ", priority="
+ priority + ", attachment=" + attachment + "]";
}
}
sever端:
package com.bfxy.netty.server;
import java.io.IOException;
import com.bfxy.netty.codec.NettyMessageDecoder;
import com.bfxy.netty.codec.NettyMessageEncoder;
import com.bfxy.netty.protocol.NettyConstant;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
/**
* @author Lilinfeng
* @date 2014年3月15日
* @version 1.0
*/
public class NettyServer {
public void bind() throws Exception {
// 配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws IOException {
ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
ch.pipeline().addLast(new NettyMessageEncoder());
ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50));
ch.pipeline().addLast(new LoginAuthRespHandler());
ch.pipeline().addLast("HeartBeatHandler", new HeartBeatRespHandler());
}
});
// 绑定端口,同步等待成功
ChannelFuture cf = b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();
System.out.println("Netty server start ok : " + (NettyConstant.REMOTEIP + " : " + NettyConstant.PORT));
cf.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
new NettyServer().bind();
}
}
Handler:登录验证
package com.bfxy.netty.server;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.bfxy.netty.protocol.MessageType;
import com.bfxy.netty.struct.Header;
import com.bfxy.netty.struct.NettyMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
/**
* @author Lilinfeng
* @date 2014年3月15日
* @version 1.0
*/
public class LoginAuthRespHandler extends ChannelInboundHandlerAdapter {
private Map<String, Boolean> nodeCheck = new ConcurrentHashMap<String, Boolean>();
private String[] whitekList = { "127.0.0.1", "192.168.1.200" };
/**
* Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward to
* the next {@link ChannelHandler} in the {@link ChannelPipeline}.
*
* Sub-classes may override this method to change behavior.
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyMessage message = (NettyMessage) msg;
// 如果是握手请求消息,处理,其它消息透传
if (message.getHeader() != null
&& message.getHeader().getType() == MessageType.LOGIN_REQ
.value()) {
String nodeIndex = ctx.channel().remoteAddress().toString();
NettyMessage loginResp = null;
// 重复登陆,拒绝
if (nodeCheck.containsKey(nodeIndex)) {
loginResp = buildResponse((byte) -1);
} else {
InetSocketAddress address = (InetSocketAddress) ctx.channel()
.remoteAddress();
String ip = address.getAddress().getHostAddress();
boolean isOK = false;
for (String WIP : whitekList) {
if (WIP.equals(ip)) {
isOK = true;
break;
}
}
loginResp = isOK ? buildResponse((byte) 0)
: buildResponse((byte) -1);
if (isOK)
nodeCheck.put(nodeIndex, true);
}
System.out.println("The login response is : " + loginResp
+ " body [" + loginResp.getBody() + "]");
ctx.writeAndFlush(loginResp);
} else {
ctx.fireChannelRead(msg);
}
}
private NettyMessage buildResponse(byte result) {
NettyMessage message = new NettyMessage();
Header header = new Header();
header.setType(MessageType.LOGIN_RESP.value());
message.setHeader(header);
message.setBody(result);
return message;
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
nodeCheck.remove(ctx.channel().remoteAddress().toString());// 删除缓存
ctx.close();
ctx.fireExceptionCaught(cause);
}
}
Handler:心跳检测
package com.bfxy.netty.server;
import com.bfxy.netty.protocol.MessageType;
import com.bfxy.netty.struct.Header;
import com.bfxy.netty.struct.NettyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* @author Lilinfeng
* @date 2014年3月15日
* @version 1.0
*/
public class HeartBeatRespHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
NettyMessage message = (NettyMessage) msg;
// 返回心跳应答消息
if (message.getHeader() != null
&& message.getHeader().getType() == MessageType.HEARTBEAT_REQ
.value()) {
System.out.println("Receive client heart beat message : ---> "
+ message);
NettyMessage heartBeat = buildHeatBeat();
System.out
.println("Send heart beat response message to client : ---> "
+ heartBeat);
ctx.writeAndFlush(heartBeat);
} else
ctx.fireChannelRead(msg);
}
private NettyMessage buildHeatBeat() {
NettyMessage message = new NettyMessage();
Header header = new Header();
header.setType(MessageType.HEARTBEAT_RESP.value());
message.setHeader(header);
return message;
}
}
Client端:
package com.bfxy.netty.client;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.bfxy.netty.codec.NettyMessageDecoder;
import com.bfxy.netty.codec.NettyMessageEncoder;
import com.bfxy.netty.protocol.NettyConstant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
/**
* @author Lilinfeng
* @date 2014年3月15日
* @version 1.0
* 查找进程:
* tasklist | findstr 2000
* netstat -ano
* windows 杀掉进程命令: taskkill -PID 进程号
*/
public class NettyClient {
private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
EventLoopGroup group = new NioEventLoopGroup();
public void connect(int port, String host) throws Exception {
// 配置客户端NIO线程组
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
ch.pipeline().addLast("MessageEncoder", new NettyMessageEncoder());
ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50));
ch.pipeline().addLast("LoginAuthHandler", new LoginAuthReqHandler());
ch.pipeline().addLast("HeartBeatHandler", new HeartBeatReqHandler());
//ch.pipeline().addLast("HeartBeatHandler", new SeverHander());
}
});
// 发起异步连接操作
ChannelFuture future = b.connect(
new InetSocketAddress(host, port),
new InetSocketAddress(NettyConstant.LOCALIP,
NettyConstant.LOCAL_PORT)).sync();
System.out.println("Client Start.. ");
future.channel().closeFuture().sync();
} finally {
// 所有资源释放完成之后,清空资源,再次发起重连操作
executor.execute(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
try {
connect(NettyConstant.PORT, NettyConstant.REMOTEIP);// 发起重连操作
} catch (Exception e) {
e.printStackTrace();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
new NettyClient().connect(NettyConstant.PORT, NettyConstant.REMOTEIP);
}
}
HeartBeatReqHandler:
package com.bfxy.netty.client;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import com.bfxy.netty.protocol.MessageType;
import com.bfxy.netty.struct.Header;
import com.bfxy.netty.struct.NettyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class HeartBeatReqHandler extends ChannelInboundHandlerAdapter {
private volatile ScheduledFuture<?> heartBeat;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyMessage message = (NettyMessage) msg;
// 握手成功,主动发送心跳消息
if (message.getHeader() != null
&& message.getHeader().getType() == MessageType.LOGIN_RESP.value()) {
this.heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000, TimeUnit.MILLISECONDS);
} else if (message.getHeader() != null
&& message.getHeader().getType() == MessageType.HEARTBEAT_RESP.value()) {
System.out.println("Client receive server heart beat message : ---> " + message);
} else {
ctx.fireChannelRead(msg);
}
}
private class HeartBeatTask implements Runnable {
private final ChannelHandlerContext ctx;
public HeartBeatTask(final ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
NettyMessage heatBeat = buildHeatBeat();
System.out.println("Client send heart beat messsage to server : ---> " + heatBeat);
ctx.writeAndFlush(heatBeat);
}
private NettyMessage buildHeatBeat() {
NettyMessage message = new NettyMessage();
Header header = new Header();
header.setType(MessageType.HEARTBEAT_REQ.value());
message.setHeader(header);
return message;
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if (heartBeat != null) {
heartBeat.cancel(true);
heartBeat = null;
}
ctx.fireExceptionCaught(cause);
}
}
LoginAuthReqHandler:
package com.bfxy.netty.client;
import com.bfxy.netty.protocol.MessageType;
import com.bfxy.netty.struct.Header;
import com.bfxy.netty.struct.NettyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class LoginAuthReqHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(buildLoginReq());
}
/**
* <B>方法名称:</B>握手请求信息<BR>
* <B>概要说明:</B>握手请求信息<BR>
* @return
*/
private NettyMessage buildLoginReq() {
//创建消息对象
NettyMessage message = new NettyMessage();
//创建Header
Header header = new Header();
//设置Header的握手请求消息
header.setType(MessageType.LOGIN_REQ.value());
message.setHeader(header);
return message;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyMessage message = (NettyMessage) msg;
// 如果是握手应答消息,需要判断是否认证成功
if(message.getHeader() != null
&& message.getHeader().getType() == MessageType.LOGIN_RESP.value()) {
byte loginResult = (byte) message.getBody();
if (loginResult != (byte) 0) {
ctx.close(); // 握手失败,关闭连接
} else {
System.out.println("Login is ok : " + message);
ctx.fireChannelRead(msg);
}
} else {
ctx.fireChannelRead(msg);
}
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
编解码协议:
ChannelBufferByteInput
ChannelBufferByteOutput
MarshallingCodecFactory
MarshallingDecoder
MarshallingEncoder
NettyMessageDecoder
NettyMessageEncoder
TestCodeC
NettyMessageEncoder:
package com.bfxy.netty.codec;
import java.io.IOException;
import java.util.Map;
import com.bfxy.netty.struct.NettyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/**
* @author Lilinfeng
* @date 2014年3月14日
* @version 1.0
*/
public final class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> {
MarshallingEncoder marshallingEncoder;
public NettyMessageEncoder() throws IOException {
this.marshallingEncoder = new MarshallingEncoder();
}
@Override
protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf sendBuf) throws Exception {
if(msg == null || msg.getHeader() == null) {
throw new Exception("The encode message is null");
}
//写入包头,起始位置为4 ==> 占4个字节位置
sendBuf.writeInt((msg.getHeader().getCrcCode()));
//数据长度 ==> 占4个字节位置
sendBuf.writeInt((msg.getHeader().getLength()));
//开始写入数据 ==> 占8个字节位置
sendBuf.writeLong((msg.getHeader().getSessionID()));
//==> 占1个字节位置
sendBuf.writeByte((msg.getHeader().getType()));
//==> 占1个字节位置
sendBuf.writeByte((msg.getHeader().getPriority()));
//==> 占4个字节位置
sendBuf.writeInt((msg.getHeader().getAttachment().size()));
String key = null;
byte[] keyArray = null;
Object value = null;
for (Map.Entry<String, Object> param : msg.getHeader().getAttachment().entrySet()) {
key = param.getKey();
keyArray = key.getBytes("UTF-8");
sendBuf.writeInt(keyArray.length);
sendBuf.writeBytes(keyArray);
value = param.getValue();
this.marshallingEncoder.encode(value, sendBuf);
}
key = null;
keyArray = null;
value = null;
if (msg.getBody() != null) {
marshallingEncoder.encode(msg.getBody(), sendBuf);
} else {
//如果为空则写四个自己进行补位
sendBuf.writeInt(0);
}
//更新数据长度 :从第四个字节开始更新(因为第四个字节之后的int类型表示整个数据包长度)
//总长度 = sendBuf.readableBytes() - 4(包头的起始位置) - 4(body数据长度占位长度)
// 因为netty认为 frame的长度为所载内容的长度,而不是报文的长度。 报文的长度为 length+lengthOffset+lengthFieldLength
sendBuf.setInt(4, sendBuf.readableBytes() - 4 - 4);
}
}
MarshallingEncoder:
package com.bfxy.netty.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import java.io.IOException;
import org.jboss.marshalling.Marshaller;
/**
* @author Lilinfeng
* @date 2014年3月14日
* @version 1.0
*/
@Sharable//共享
public class MarshallingEncoder {
private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; //int
Marshaller marshaller;
public MarshallingEncoder() throws IOException {
marshaller = MarshallingCodecFactory.buildMarshalling();
}
protected void encode(Object msg, ByteBuf out) throws Exception {
try {
//起始数据位置
int lengthPos = out.writerIndex();
System.out.println("----序列化之前的lengthPos: " + lengthPos);
out.writeBytes(LENGTH_PLACEHOLDER); // 4
System.out.println("----预留四个字节,用于更新数据长度!执行序列化操作。。----");
ChannelBufferByteOutput output = new ChannelBufferByteOutput(out);
marshaller.start(output);
marshaller.writeObject(msg);
marshaller.finish();
System.out.println("----序列化完毕后的out.writerIndex(): " + out.writerIndex());
System.out.println("最后更新数据长度公式为:总长度-初始化长度-4个预留字节长度 = 数据长度,进行set设置值");
//写数据为:总长度-起始位置-4个字节为数据长度
out.setInt(lengthPos, out.writerIndex() - lengthPos - 4);
} finally {
marshaller.close();
}
}
}
ChannelBufferByteInput:
package com.bfxy.netty.codec;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.StreamCorruptedException;
import org.jboss.marshalling.ByteInput;
import org.jboss.marshalling.Unmarshaller;
/**
* @author Lilinfeng
* @date 2014年3月14日
* @version 1.0
*/
public class MarshallingDecoder {
private final Unmarshaller unmarshaller;
/**
* Creates a new decoder whose maximum object size is {@code 1048576} bytes.
* If the size of the received object is greater than {@code 1048576} bytes,
* a {@link StreamCorruptedException} will be raised.
*
* @throws IOException
*
*/
public MarshallingDecoder() throws IOException {
this.unmarshaller = MarshallingCodecFactory.buildUnMarshalling();
}
protected Object decode(ByteBuf in) throws Exception {
//先读取数据包的长度
int objectSize = in.readInt();
System.out.println("进行解码操作,首先获取数据总长度为: " + objectSize);
//进行slice截取从读取位置开始为:in.readerIndex() 读取后objectSize的数据长度
System.out.println("从in.readerIndex()= "+ in.readerIndex() +" 位置开始, 读取数据长度objectSize=" + objectSize + "的数据!");
ByteBuf buf = in.slice(in.readerIndex(), objectSize);
ByteInput input = new ChannelBufferByteInput(buf);
try {
//进行解码操作
this.unmarshaller.start(input);
Object obj = this.unmarshaller.readObject();
this.unmarshaller.finish();
//读取完毕后,更新当前读取起始位置为:in.readerIndex() + objectSize
in.readerIndex(in.readerIndex() + objectSize);
return obj;
} finally {
this.unmarshaller.close();
}
}
}
NettyMessageDecoder:解码
package com.bfxy.netty.codec;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import com.bfxy.netty.struct.Header;
import com.bfxy.netty.struct.NettyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
/**
* @author Lilinfeng
* @date 2014年3月15日
* @version 1.0
*/
public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {
MarshallingDecoder marshallingDecoder;
public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) throws IOException {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
marshallingDecoder = new MarshallingDecoder();
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = (ByteBuf) super.decode(ctx, in);
if (frame == null) {
return null;
}
NettyMessage message = new NettyMessage();
Header header = new Header();
//获取crcCode ==> 占4个字节位置
header.setCrcCode(frame.readInt());
//获取数据包总长度 ==> 占4个字节位置
header.setLength(frame.readInt());
//获取sessionId ==> 占8个字节位置
header.setSessionID(frame.readLong());
//获取消息类型 ==> 占1个字节位置
header.setType(frame.readByte());
//获取消息优先级 ==> 占1个字节位置
header.setPriority(frame.readByte());
//获取附件个数 ==> 占4个字节位置
int size = frame.readInt();
//如果附件个数>0 证明存在附件
if (size > 0) {
Map<String, Object> attch = new HashMap<String, Object>(size);
int keySize = 0;
byte[] keyArray = null;
String key = null;
//循环附件个数,取值
for (int i = 0; i < size; i++) {
keySize = frame.readInt();
keyArray = new byte[keySize];
frame.readBytes(keyArray);
key = new String(keyArray, "UTF-8");
attch.put(key, marshallingDecoder.decode(frame));
}
keyArray = null;
key = null;
header.setAttachment(attch);
}
//如果最终数据包>4 证明是有数据的,则开始解码
if (frame.readableBytes() > 4) {
message.setBody(marshallingDecoder.decode(frame));
}
message.setHeader(header);
return message;
}
}
MarshallingDecoder:
package com.bfxy.netty.codec;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.StreamCorruptedException;
import org.jboss.marshalling.ByteInput;
import org.jboss.marshalling.Unmarshaller;
/**
* @author Lilinfeng
* @date 2014年3月14日
* @version 1.0
*/
public class MarshallingDecoder {
private final Unmarshaller unmarshaller;
/**
* Creates a new decoder whose maximum object size is {@code 1048576} bytes.
* If the size of the received object is greater than {@code 1048576} bytes,
* a {@link StreamCorruptedException} will be raised.
*
* @throws IOException
*
*/
public MarshallingDecoder() throws IOException {
this.unmarshaller = MarshallingCodecFactory.buildUnMarshalling();
}
protected Object decode(ByteBuf in) throws Exception {
//先读取数据包的长度
int objectSize = in.readInt();
System.out.println("进行解码操作,首先获取数据总长度为: " + objectSize);
//进行slice截取从读取位置开始为:in.readerIndex() 读取后objectSize的数据长度
System.out.println("从in.readerIndex()= "+ in.readerIndex() +" 位置开始, 读取数据长度objectSize=" + objectSize + "的数据!");
ByteBuf buf = in.slice(in.readerIndex(), objectSize);
ByteInput input = new ChannelBufferByteInput(buf);
try {
//进行解码操作
this.unmarshaller.start(input);
Object obj = this.unmarshaller.readObject();
this.unmarshaller.finish();
//读取完毕后,更新当前读取起始位置为:in.readerIndex() + objectSize
in.readerIndex(in.readerIndex() + objectSize);
return obj;
} finally {
this.unmarshaller.close();
}
}
}
ChannelBufferByteOutput:
package com.bfxy.netty.codec;
import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.ByteOutput;
import java.io.IOException;
/**
* {@link ByteOutput} implementation which writes the data to a {@link ByteBuf}
*
*
*/
class ChannelBufferByteOutput implements ByteOutput {
private final ByteBuf buffer;
/**
* Create a new instance which use the given {@link ByteBuf}
*/
public ChannelBufferByteOutput(ByteBuf buffer) {
this.buffer = buffer;
}
@Override
public void close() throws IOException {
// Nothing to do
}
@Override
public void flush() throws IOException {
// nothing to do
}
@Override
public void write(int b) throws IOException {
buffer.writeByte(b);
}
@Override
public void write(byte[] bytes) throws IOException {
buffer.writeBytes(bytes);
}
@Override
public void write(byte[] bytes, int srcIndex, int length) throws IOException {
buffer.writeBytes(bytes, srcIndex, length);
}
/**
* Return the {@link ByteBuf} which contains the written content
*
*/
ByteBuf getBuffer() {
return buffer;
}
}
TestCodeC:测试
package com.bfxy.netty.codec;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import com.bfxy.netty.struct.Header;
import com.bfxy.netty.struct.NettyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
* @author Administrator
* @date 2014年3月15日
* @version 1.0
*/
public class TestCodeC {
MarshallingEncoder marshallingEncoder;
MarshallingDecoder marshallingDecoder;
public TestCodeC() throws IOException {
marshallingDecoder = new MarshallingDecoder();
marshallingEncoder = new MarshallingEncoder();
}
public NettyMessage getMessage() {
NettyMessage nettyMessage = new NettyMessage();
Header header = new Header();
header.setLength(123);
header.setSessionID(99999);
header.setType((byte) 1);
header.setPriority((byte) 7);
Map<String, Object> attachment = new HashMap<String, Object>();
for (int i = 0; i < 10; i++) {
attachment.put("ciyt --> " + i, "lilinfeng " + i);
}
header.setAttachment(attachment);
nettyMessage.setHeader(header);
nettyMessage.setBody("abcdefg-----------------------AAAAAA");
return nettyMessage;
}
public ByteBuf encode(NettyMessage msg) throws Exception {
ByteBuf sendBuf = Unpooled.buffer();
//包头
sendBuf.writeInt((msg.getHeader().getCrcCode()));
//数据长度:
sendBuf.writeInt((msg.getHeader().getLength()));
sendBuf.writeLong((msg.getHeader().getSessionID()));
sendBuf.writeByte((msg.getHeader().getType()));
sendBuf.writeByte((msg.getHeader().getPriority()));
sendBuf.writeInt((msg.getHeader().getAttachment().size()));
String key = null;
byte[] keyArray = null;
Object value = null;
for (Map.Entry<String, Object> param : msg.getHeader().getAttachment()
.entrySet()) {
key = param.getKey();
keyArray = key.getBytes("UTF-8");
sendBuf.writeInt(keyArray.length);
sendBuf.writeBytes(keyArray);
value = param.getValue();
marshallingEncoder.encode(value, sendBuf);
}
key = null;
keyArray = null;
value = null;
if (msg.getBody() != null) {
marshallingEncoder.encode(msg.getBody(), sendBuf);
} else {
sendBuf.writeInt(0);
}
//更新数据长度
System.out.println("可读:" + sendBuf.readableBytes());
sendBuf.setInt(4, sendBuf.readableBytes());
return sendBuf;
}
public NettyMessage decode(ByteBuf in) throws Exception {
NettyMessage message = new NettyMessage();
Header header = new Header();
header.setCrcCode(in.readInt());
header.setLength(in.readInt());
header.setSessionID(in.readLong());
header.setType(in.readByte());
header.setPriority(in.readByte());
int size = in.readInt();
if (size > 0) {
Map<String, Object> attch = new HashMap<String, Object>(size);
int keySize = 0;
byte[] keyArray = null;
String key = null;
for (int i = 0; i < size; i++) {
keySize = in.readInt();
keyArray = new byte[keySize];
in.readBytes(keyArray);
key = new String(keyArray, "UTF-8");
attch.put(key, marshallingDecoder.decode(in));
}
keyArray = null;
key = null;
header.setAttachment(attch);
}
if (in.readableBytes() > 4) {
message.setBody(marshallingDecoder.decode(in));
}
message.setHeader(header);
return message;
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
TestCodeC testC = new TestCodeC();
NettyMessage message = testC.getMessage();
System.out.println(message + "[body ] " + message.getBody());
for (int i = 0; i < 1; i++) {
ByteBuf buf = testC.encode(message);
NettyMessage decodeMsg = testC.decode(buf);
System.out.println(decodeMsg + "[body ] " + decodeMsg.getBody());
System.out.println("-------------------------------------------------");
}
}
}
pom:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.bfxy</groupId>
<artifactId>netty-002</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>netty-002</name>
<description>netty-002</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.12.Final</version>
</dependency>
<!-- 序列化框架marshalling -->
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling</artifactId>
<version>1.3.0.CR9</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>1.3.0.CR9</version>
</dependency>
<!-- 序列化框架protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
最后
以上就是等待石头为你收集整理的使用netty自定义协议的全部内容,希望文章能够帮你解决使用netty自定义协议所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复