概述
文章目录
- 实现功能
- 通信模型
- Netty协议的编解码规范
- Netty协议解码
- 代码实现
- 依赖
- 消息结构定义
- 消息头定义 Header
- 消息定义 NettyMessage
- 消息类型定义 MessageType
- 返回结果定义
- 端口常量定义 NettyConstant
- 消息编解码
- ChannelBufferByteInput
- ChannelBufferByteOutput
- MarshallingCodecFactory
- MarshallingDecoder
- MarshallingEncoder
- NettyMessageDecoder
- NettyMessageEncoder
- 客户端实现
- ClientHandler
- 客户端心跳检测 HeartBeatReqHandler
- 客户端握手认证 LoginAuthReqHandler
- 客户端 NettyClient
- 服务端实现
- 服务端心跳检测 HeartBeatRespHandler
- 服务端握手认证 LoginAuthRespHandler
- ServerHandler
- NettyServer
- 测试
- 源码下载
- 参考
实现功能
- 基于Netty的NIO通信框架,提供高性能的异步通信能力;
- 提供消息的编码解码框架,可以实现POJO的序列化和反序列化;
- .提供基于IP地址的白名单接入认证机制;
- 链路的有效性校验机制;
- 链路的断连重连机制;
通信模型
Netty协议通信双方链路建立成功之后,双方可以进行全双工通信,无论客户端还是服务端,都可以主动发送请求消息给对方,通信方式可以是TWO WAY或者ONE WAY。双方之间都心跳采用Ping-Pong机制,当链路处于空闲状态时,客户端主动发送Ping消息给服务端,服务端接收到Ping消息后发送应答消息Pong给客户端,如果客户端连续发送N条Ping消息都没有接收到服务端端Ping消息,说明链路已经挂死或者对方处于异常状态,客户端主动关闭连接,间隔周期T后发起重连操作,知道重连成功
具体步骤:
- Netty协议栈客户端发送握手请求消息,携带节点ID等有效身份认证信息;
- Netty协议栈服务端对握手请求消息进行合法性校验,包括节点ID有效性校验、节点重复登录校验和IP地址合法性校验,校验通过之后,返回登录成功的握手应答消息;
- 链路建立成功之后,客户端发送业务消息;
- 链路建立成功之后,服务端发送心跳消息;
- 链路建立成功之后,客户端发送心跳消息;
- 链路建立成功之后,服务端发送业务消息
- 服务端推出时,服务端关闭连接,客户端感知对方关闭连接后,被动关闭客户端连接。
Netty协议通信双方练了路建立成功后,双方可以进行全双工通信,无论客户端还是服务端,都可以主动发送请求消息给对方,通信方式可以是TWO WAY或者ONE WAY。双方之前的心跳采用Ping-Pong机制,当链路处理控线状态时,客户端主动发送Ping消息给服务端,服务端接收到Ping消息后发送应答消息Pong给客户端,如果客户端连续发送N条Ping消息都没有接收到服务端返回的Pong消息,说明链路已经挂死或者对方处理异常状态,客户端主动关闭连接,间隔周期T后发起重连操作,知道重连成功
Netty协议的编解码规范
###Netty协议编码
Netty协议NettyMessage的编码规范如下:
- rcCode:java.nio.ByteBuffer.putInt(int value),如果采用其他缓冲区实现,必须与其等价;
2.length:java.nio.ByteBuffer.putInt(int value),如果采用其他缓冲区实现,必须与其等价; - sessionID:java.nio.ByteBuffer.putLong(long value),如果采用其他缓冲区实现,必须与其等价;
- type: java.nio.ByteBuffer.put(byte b),如果采用其他缓冲区实现,必须与其等价;
- priority:java.nio.ByteBuffer.put(byte b),如果采用其他缓冲区实现,必须与其等价;
- attachment:它的编码规则为——如果attachment长度为0,表示没有可选附件,则将长度编码设为0,java.nio.ByteBuffer.putInt(0);如果大于0,说明有附件需要编码,具体的编码规则如下:首先对附件的个数进行编码,java.nio.ByteBuffer.putInt(attachment.size());然后对Key进行编码,再将它转换成byte数组之后编码内容.
- body的编码:通过JBoss Marshalling将其序列化为byte数组,然后调用java.nio.ByteBuffer.put(byte [] src)将其写入ByteBuffer缓冲区中。
由于整个消息的长度必须等全部字段都编码完成之后才能确认,所以最后需要更新消息头中的length字段,将其重新写入ByteBuffer中。
Netty协议解码
相对于NettyMessage的编码,仍旧以java.nio.ByteBuffer为例,给出Netty协议的解码规范:
- crcCode:通过java.nio.ByteBuffer.getInt()获取校验码字段,其他缓冲区需要与其等价;
- length:通过java.nio.ByteBuffer.getInt()获取Netty消息的长度,其他缓冲区需要与其等价;
- sessionID:通过java.nio.ByteBuffer.getLong()获取会话ID,其他缓冲区需要与其等价;
- type:通过java.nio.ByteBuffer.get()获取消息类型,其他缓冲区需要与其等价;
- priority:通过java.nio.ByteBuffer.get()获取消息优先级,其他缓冲区需要与其等价;
- attachment:它的解码规则为——首先创建一个新的attachment对象,调用java.nio.ByteBuffer.getInt()获取附件的长度,如果为0,说明附件为空,解码结束,继续解消息体;如果非空,则根据长度通过for循环进行解码;
- body:通过JBoss的marshaller对其进行解码。
代码实现
依赖
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha1</version>
</dependency>
<!--其他可能用到的依赖-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>2.0.2.Final</version>
</dependency>
</dependencies>
消息结构定义
消息头定义 Header
@Data
public final class Header {
private int crcCode = 0xadaf0105;
/**
* 消息长度
*/
private int length;
/**
* 会话ID
*/
private long sessionId;
/**
* 消息类型
*/
private byte type;
/**
* 消息优先级
*/
private byte priority;
/**
* 附件
*/
private Map<String, Object> attachment = new HashMap<>();
}
消息定义 NettyMessage
@Data
public final class NettyMessage {
/**
* 消息头
*/
private Header header;
/**
* 消息体
*/
private Object body;
}
消息类型定义 MessageType
public enum MessageType {
/**
* 业务请求消息
*/
SERVICE_REQ((byte) 0),
/**
* 业务响应(应答)消息
*/
SERVICE_RESP((byte) 1),
/**
* 业务ONE WAY消息(既是请求消息又是响应消息)
*/
ONE_WAY((byte) 2),
/**
* 握手请求消息
*/
LOGIN_REQ((byte) 3),
/**
* 握手响应(应答)消息
*/
LOGIN_RESP((byte) 4),
/**
* 心跳请求消息
*/
HEARTBEAT_REQ((byte) 5),
/**
* 心跳响应(应答)消息
*/
HEARTBEAT_RESP((byte) 6);
private byte value;
MessageType(byte value) {
this.value = value;
}
public byte value() {
return value;
}
}
返回结果定义
public enum ResultType {
/**
* 认证成功
*/
SUCCESS((byte) 0),
/**
* 认证失败
*/
FAIL((byte) -1),
;
private byte value;
private ResultType(byte value) {
this.value = value;
}
public byte value() {
return this.value;
}
}
端口常量定义 NettyConstant
public interface NettyConstant {
public static final String REMOTEIP = "127.0.0.1";
public static final int PORT = 8080;
public static final int LOCAL_PORT = 12088;
public static final String LOCALIP = "127.0.0.1";
}
消息编解码
ChannelBufferByteInput
/**
* @author WH
* @version 1.0
* @date 2020/5/31 13:58
* @Description 消息编码器
*/
public class ChannelBufferByteInput implements ByteInput {
private final ByteBuf buffer;
public ChannelBufferByteInput(ByteBuf buffer) {
this.buffer = buffer;
}
@Override
public void close() throws IOException {
// nothing to do
}
@Override
public int available() throws IOException {
return buffer.readableBytes();
}
@Override
public int read() throws IOException {
if (buffer.isReadable()) {
return buffer.readByte() & 0xff;
}
return -1;
}
@Override
public int read(byte[] array) throws IOException {
return read(array, 0, array.length);
}
@Override
public int read(byte[] dst, int dstIndex, int length) throws IOException {
int available = available();
if (available == 0) {
return -1;
}
length = Math.min(available, length);
buffer.readBytes(dst, dstIndex, length);
return length;
}
@Override
public long skip(long bytes) throws IOException {
int readable = buffer.readableBytes();
if (readable < bytes) {
bytes = readable;
}
buffer.readerIndex((int) (buffer.readerIndex() + bytes));
return bytes;
}
}
ChannelBufferByteOutput
package codec;
import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.ByteOutput;
import java.io.IOException;
/**
* @author WH
* @version 1.0
* @date 2020/5/31 14:10
* @Description 消息解码器
*/
public class ChannelBufferByteOutput implements ByteOutput {
private final ByteBuf buffer;
/**
* Create a new instance which use the given {@link ByteBuf}
*/
public ChannelBufferByteOutput(ByteBuf buffer) {
this.buffer = buffer;
}
@Override
public void close() throws IOException {
// Nothing to do
}
@Override
public void flush() throws IOException {
// nothing to do
}
@Override
public void write(int b) throws IOException {
buffer.writeByte(b);
}
@Override
public void write(byte[] bytes) throws IOException {
buffer.writeBytes(bytes);
}
@Override
public void write(byte[] bytes, int srcIndex, int length) throws IOException {
buffer.writeBytes(bytes, srcIndex, length);
}
/**
* Return the {@link ByteBuf} which contains the written content
*
*/
ByteBuf getBuffer() {
return buffer;
}
}
MarshallingCodecFactory
package codec;
import org.jboss.marshalling.*;
import java.io.IOException;
/**
* @author WH
* @version 1.0
* @date 2020/5/31 14:12
* @Description 编码器
*/
public class MarshallingCodecFactory {
/**
* 创建Jboss Marshaller
*
* @return
* @throws IOException
*/
protected static Marshaller buildMarshalling() throws IOException {
final MarshallerFactory marshallerFactory = Marshalling
.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
Marshaller marshaller = marshallerFactory
.createMarshaller(configuration);
return marshaller;
}
/**
* 创建Jboss Unmarshaller
*
* @return
* @throws IOException
*/
protected static Unmarshaller buildUnMarshalling() throws IOException {
final MarshallerFactory marshallerFactory = Marshalling
.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
final Unmarshaller unmarshaller = marshallerFactory
.createUnmarshaller(configuration);
return unmarshaller;
}
}
MarshallingDecoder
package codec;
import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.Unmarshaller;
import java.io.IOException;
/**
* @author WH
* @version 1.0
* @date 2020/5/31 14:26
* @Description TODO
*/
public class MarshallingDecoder {
private final Unmarshaller unmarshaller;
public MarshallingDecoder() throws IOException {
unmarshaller = MarshallingCodecFactory.buildUnMarshalling();
}
protected Object decode(ByteBuf in) throws Exception {
//1 首先读取4个长度(实际body内容长度)
int objectSize = in.readInt();
//2 获取实际body的缓冲内容
int readIndex = in.readerIndex();
ByteBuf buf = in.slice(readIndex, objectSize);
//3 转换
ChannelBufferByteInput input = new ChannelBufferByteInput(buf);
try {
//4 读取操作:
unmarshaller.start(input);
Object obj = unmarshaller.readObject();
unmarshaller.finish();
//5 读取完毕以后, 更新当前读取起始位置:
//因为使用slice方法,原buf的位置还在readIndex上,故需要将位置重新设置一下
in.readerIndex(in.readerIndex() + objectSize);
return obj;
} finally {
unmarshaller.close();
}
}
}
MarshallingEncoder
package codec;
import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.Marshaller;
import java.io.IOException;
/**
* @author WH
* @version 1.0
* @date 2020/5/31 14:21
* @Description TODO
*/
public class MarshallingEncoder {
//空白占位: 用于预留设置 body的数据包长度
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
Marshaller marshaller;
public MarshallingEncoder() throws IOException {
marshaller = MarshallingCodecFactory.buildMarshalling();
}
protected void encode(Object msg, ByteBuf out) throws Exception {
try {
//必须要知道当前的数据位置是哪: 起始数据位置
//长度属性的位置索引
int lengthPos = out.writerIndex();
//占位写操作:先写一个4个字节的空的内容,记录在起始数据位置,用于设置内容长度
out.writeBytes(LENGTH_PLACEHOLDER);
ChannelBufferByteOutput output = new ChannelBufferByteOutput(out);
marshaller.start(output);
marshaller.writeObject(msg);
marshaller.finish();
//总长度(结束位置) - 初始化长度(起始位置) - 预留的长度 = body数据长度
int endPos = out.writerIndex();
out.setInt(lengthPos, endPos - lengthPos - 4);
} finally {
marshaller.close();
}
}
}
NettyMessageDecoder
package codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import message.Header;
import message.NettyMessage;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* @author WH
* @version 1.0
* @date 2020/5/31 13:48
* @Description TODO
*/
public final class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {
MarshallingDecoder marshallingDecoder;
/**
*
* @param maxFrameLength 第一个参数代表最大的序列化长度
* @param lengthFieldOffset 代表长度属性的偏移量 简单来说就是message中 总长度的起始位置(Header中的length属性的起始位置) 本例中为4
* @param lengthFieldLength 代表长度属性的长度 整个属性占多长(length属性为int,占4个字节) 4
* @throws IOException
*/
public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset,
int lengthFieldLength) throws IOException {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
marshallingDecoder = new MarshallingDecoder();
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in)
throws Exception {
ByteBuf frame = (ByteBuf) super.decode(ctx, in);
if (frame == null) {
return null;
}
NettyMessage message = new NettyMessage();
Header header = new Header();
// 加通信标记认证逻辑
header.setCrcCode(frame.readInt());
header.setLength(frame.readInt());
header.setSessionId(frame.readLong());
header.setType(frame.readByte());
header.setPriority(frame.readByte());
int size = frame.readInt();
//附件个数大于0,则需要解码操作
if (size > 0) {
Map<String, Object> attch = new HashMap<String, Object>(size);
int keySize = 0;
byte[] keyArray = null;
String key = null;
for (int i = 0; i < size; i++) {
keySize = frame.readInt();
keyArray = new byte[keySize];
frame.readBytes(keyArray);
key = new String(keyArray, "UTF-8");
attch.put(key, marshallingDecoder.decode(frame));
}
keyArray = null;
key = null;
//解码完成放入attachment
header.setAttachment(attch);
}
message.setHeader(header);
//对于ByteBuf来说,读一个数据,就会少一个数据,所以读完header,剩下的应该就是body了
if (frame.readableBytes() > 4) {
message.setBody(marshallingDecoder.decode(frame));
}
return message;
}
}
NettyMessageEncoder
package codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.CharsetUtil;
import message.Header;
import message.NettyMessage;
import java.io.IOException;
import java.util.Map;
/**
* @author WH
* @version 1.0
* @date 2020/5/31 14:15
* @Description TODO
*/
public class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> {
private MarshallingEncoder marshallingEncoder;
public NettyMessageEncoder() throws IOException {
this.marshallingEncoder = new MarshallingEncoder();
}
@Override
protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf sendBuf) throws Exception {
if(msg == null || msg.getHeader() == null){
throw new Exception("编码失败,没有数据信息!");
}
//Head:
Header header = msg.getHeader();
sendBuf.writeInt(header.getCrcCode());//校验码
sendBuf.writeInt(header.getLength());//总长度
sendBuf.writeLong(header.getSessionId());//会话id
sendBuf.writeByte(header.getType());//消息类型
sendBuf.writeByte(header.getPriority());//优先级
//对附件信息进行编码
//编码规则为:如果attachment的长度为0,表示没有可选附件,则将长度 编码设置为0
//如果attachment长度大于0,则需要编码,规则:
//首先对附件的个数进行编码
sendBuf.writeInt((header.getAttachment().size())); //附件大小
String key = null;
byte[] keyArray = null;
Object value = null;
//然后对key进行编码,先编码长度,然后再将它转化为byte数组之后编码内容
for (Map.Entry<String, Object> param : header.getAttachment()
.entrySet()) {
key = param.getKey();
keyArray = key.getBytes(CharsetUtil.UTF_8);
sendBuf.writeInt(keyArray.length);//key的字符编码长度
sendBuf.writeBytes(keyArray);
value = param.getValue();
marshallingEncoder.encode(value, sendBuf);
}
key = null;
keyArray = null;
value = null;
//Body:
Object body = msg.getBody();
//如果不为空 说明: 有数据
if(body != null){
//使用MarshallingEncoder
this.marshallingEncoder.encode(body, sendBuf);
} else {
//如果没有数据 则进行补位 为了方便后续的 decoder操作
sendBuf.writeInt(0);
}
//最后我们要获取整个数据包的总长度 也就是 header + body 进行对 header length的设置
// TODO: 解释: 在这里必须要-8个字节 ,是因为要把CRC和长度本身占的减掉了
//(官方中给出的是:LengthFieldBasedFrameDecoder中的lengthFieldOffset+lengthFieldLength)
//总长度是在header协议的第二个标记字段中
//第一个参数是长度属性的索引位置
sendBuf.setInt(4, sendBuf.readableBytes() - 8);
}
}
客户端实现
ClientHandler
package client;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import message.NettyMessage;
/**
* @author WH
* @version 1.0
* @date 2020/6/1 21:30
* @Description TODO
*/
@Slf4j
public class ClientHandler extends ChannelHandlerAdapter {
// 连接成功监听
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
NettyMessage message = (NettyMessage)msg;
System.err.println("Client receive message from server: " + message.getBody());
} finally {
ReferenceCountUtil.release(msg);
}
}
// 客户端断开连接监听
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
log.info("----------客户端断开连接-----------");
ctx.close();
}
}
客户端心跳检测 HeartBeatReqHandler
package client;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.ScheduledFuture;
import lombok.extern.slf4j.Slf4j;
import message.Header;
import message.MessageType;
import message.NettyMessage;
import java.util.concurrent.TimeUnit;
/**
* @author WH
* @version 1.0
* @date 2020/5/31 15:29
* @Description 客户端心跳检测
*/
@Slf4j
public class HeartBeatReqHandler extends ChannelHandlerAdapter {
private volatile ScheduledFuture<?> heartBeat;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
NettyMessage message = (NettyMessage) msg;
// 握手成功,主动发送心跳消息
if (message.getHeader() != null
&& message.getHeader().getType() == MessageType.LOGIN_RESP
.value()) {
heartBeat = ctx.executor().scheduleAtFixedRate(
new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000,
TimeUnit.MILLISECONDS);
log.info("客户端发送心跳包");
} else if (message.getHeader() != null
&& message.getHeader().getType() == MessageType.HEARTBEAT_RESP
.value()) {
log.info("Client receive server heart beat message : ---> {}", message);
} else
ctx.fireChannelRead(msg);
}
private class HeartBeatTask implements Runnable {
private final ChannelHandlerContext ctx;
public HeartBeatTask(final ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
NettyMessage heatBeat = buildHeatBeat();
log.info("Client send heart beat messsage to server : ---> {}", heatBeat);
ctx.writeAndFlush(heatBeat);
}
private NettyMessage buildHeatBeat() {
NettyMessage message = new NettyMessage();
Header header = new Header();
header.setType(MessageType.HEARTBEAT_REQ.value());
message.setHeader(header);
return message;
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
if (heartBeat != null) {
heartBeat.cancel(true);
heartBeat = null;
}
ctx.fireExceptionCaught(cause);
}
}
客户端握手认证 LoginAuthReqHandler
package client;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import message.Header;
import message.MessageType;
import message.NettyMessage;
/**
* @author WH
* @version 1.0
* @date 2020/5/31 15:23
* @Description 客户端握手认证
*/
@Slf4j
public class LoginAuthReqHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(buildLoginReq());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
NettyMessage message = (NettyMessage) msg;
// 如果是握手应答消息,需要判断是否认证成功
if (message.getHeader() != null
&& message.getHeader().getType() == MessageType.LOGIN_RESP
.value()) {
byte loginResult = (byte) message.getBody();
if (loginResult != (byte) 0) {
// 握手失败,关闭连接
ctx.close();
} else {
log.info("Login is ok : {}", message);
ctx.fireChannelRead(msg);
}
} else
ctx.fireChannelRead(msg);
}
private NettyMessage buildLoginReq() {
NettyMessage message = new NettyMessage();
Header header = new Header();
header.setType(MessageType.LOGIN_REQ.value());
message.setHeader(header);
return message;
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.fireExceptionCaught(cause);
}
}
客户端 NettyClient
package client;
import codec.NettyMessageDecoder;
import codec.NettyMessageEncoder;
import constant.NettyConstant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author WH
* @version 1.0
* @date 2020/5/31 15:33
* @Description 客户端
*/
public class NettyClient {
public static void main(String[] args) throws Exception {
new NettyClient().connect(NettyConstant.PORT, NettyConstant.REMOTEIP);
}
private ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
//创建工作线程组
EventLoopGroup group = new NioEventLoopGroup();
public void connect(int port, String host) throws Exception {
// 配置客户端NIO线程组
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
ch.pipeline().addLast(new NettyMessageEncoder());
ch.pipeline().addLast("readTimeoutHandler", new ReadTimeoutHandler(50));
ch.pipeline().addLast("LoginAuthHandler", new LoginAuthReqHandler());
ch.pipeline().addLast("HeartBeatHandler", new HeartBeatReqHandler());
}
});
// 发起异步连接操作
ChannelFuture future = b.connect(new InetSocketAddress(host, port),
new InetSocketAddress(NettyConstant.LOCALIP, NettyConstant.LOCAL_PORT)).sync();
//手动发测试数据,验证是否会产生TCP粘包/拆包情况
// Channel c = future.channel();
//
// for (int i = 0; i < 500; i++) {
// NettyMessage message = new NettyMessage();
// Header header = new Header();
// header.setSessionID(1001L);
// header.setPriority((byte) 1);
// header.setType((byte) 0);
// message.setHeader(header);
// message.setBody("我是请求数据" + i);
// c.writeAndFlush(message);
// }
future.channel().closeFuture().sync();
} finally {
// 所有资源释放完成之后,清空资源,再次发起重连操作
executor.execute(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
try {
connect(NettyConstant.PORT, NettyConstant.REMOTEIP);// 发起重连操作
} catch (Exception e) {
e.printStackTrace();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
服务端实现
服务端心跳检测 HeartBeatRespHandler
package server;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import message.Header;
import message.MessageType;
import message.NettyMessage;
/**
* @author WH
* @version 1.0
* @date 2020/5/31 15:31
* @Description TODO
*/
@Slf4j
public class HeartBeatRespHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
NettyMessage message = (NettyMessage) msg;
// 返回心跳应答消息
if (message.getHeader() != null
&& message.getHeader().getType() == MessageType.HEARTBEAT_REQ
.value()) {
log.info("Receive client heart beat message : ---> {}", message);
NettyMessage heartBeat = buildHeatBeat();
log.info("Send heart beat response message to client : ---> {}", heartBeat);
ctx.writeAndFlush(heartBeat);
} else
ctx.fireChannelRead(msg);
}
private NettyMessage buildHeatBeat() {
NettyMessage message = new NettyMessage();
Header header = new Header();
header.setType(MessageType.HEARTBEAT_RESP.value());
message.setHeader(header);
return message;
}
}
服务端握手认证 LoginAuthRespHandler
package server;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import message.Header;
import message.MessageType;
import message.NettyMessage;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author WH
* @version 1.0
* @date 2020/5/31 15:26
* @Description 服务端握手和安全认证
*/
@Slf4j
public class LoginAuthRespHandler extends ChannelHandlerAdapter {
private Map<String, Boolean> nodeCheck = new ConcurrentHashMap<String, Boolean>();
private String[] whitekList = { "127.0.0.1", "192.168.56.1" };
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
NettyMessage message = (NettyMessage) msg;
// 如果是握手请求消息,处理,其它消息透传
if (message.getHeader() != null
&& message.getHeader().getType() == MessageType.LOGIN_REQ
.value()) {
String nodeIndex = ctx.channel().remoteAddress().toString();
NettyMessage loginResp = null;
// 重复登陆,拒绝
if (nodeCheck.containsKey(nodeIndex)) {
loginResp = buildResponse((byte) -1);
} else {
InetSocketAddress address = (InetSocketAddress) ctx.channel()
.remoteAddress();
String ip = address.getAddress().getHostAddress();
boolean isOK = false;
for (String WIP : whitekList) {
if (WIP.equals(ip)) {
isOK = true;
break;
}
}
loginResp = isOK ? buildResponse((byte) 0)
: buildResponse((byte) -1);
if (isOK)
nodeCheck.put(nodeIndex, true);
}
log.info("The login response is : {} body [ {} ]", loginResp, loginResp.getBody());
ctx.writeAndFlush(loginResp);
} else {
ctx.fireChannelRead(msg);
}
}
private NettyMessage buildResponse(byte result) {
NettyMessage message = new NettyMessage();
Header header = new Header();
header.setType(MessageType.LOGIN_RESP.value());
message.setHeader(header);
message.setBody(result);
return message;
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
nodeCheck.remove(ctx.channel().remoteAddress().toString());// 删除缓存
ctx.close();
ctx.fireExceptionCaught(cause);
}
}
ServerHandler
package server;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import message.Header;
import message.NettyMessage;
/**
* @author WH
* @version 1.0
* @date 2020/6/1 21:31
* @Description TODO
*/
@Slf4j
public class ServerHandler extends ChannelHandlerAdapter {
/**
* 当我们通道进行激活的时候 触发的监听方法
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("通道激活");
}
/**
* 当我们的通道里有数据进行读取的时候 触发的监听方法
*/
@Override
public void channelRead(ChannelHandlerContext ctx /*NETTY服务上下文*/, Object msg /*实际的传输数据*/) throws Exception {
NettyMessage requestMessage = (NettyMessage) msg;
System.err.println("Server receive message from Client: " + requestMessage.getBody());
NettyMessage responseMessage = new NettyMessage();
Header header = new Header();
header.setSessionId(2002L);
header.setPriority((byte) 2);
header.setType((byte) 1);
responseMessage.setHeader(header);
responseMessage.setBody("我是响应数据: " + requestMessage.getBody());
ctx.writeAndFlush(responseMessage);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
log.info("--------数据读取完毕----------");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
log.info("--------服务器数据读异常----------:");
cause.printStackTrace();
ctx.close();
}
}
NettyServer
package server;
import codec.NettyMessageDecoder;
import codec.NettyMessageEncoder;
import constant.NettyConstant;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
/**
* @author WH
* @version 1.0
* @date 2020/5/31 15:35
* @Description TODO
*/
@Slf4j
public class NettyServer {
public static void main(String[] args) throws Exception {
new NettyServer().bind();
}
public void bind() throws Exception {
//1 用于接受客户端连接的线程工作组
EventLoopGroup bossGroup = new NioEventLoopGroup();
//2 用于对接受客户端连接读写操作的线程工作组
EventLoopGroup workerGroup = new NioEventLoopGroup();
//3 辅助类。用于帮助我们创建NETTY服务
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//绑定两个工作线程组
.channel(NioServerSocketChannel.class) //设置NIO的模式
.option(ChannelOption.SO_BACKLOG, 1024) //设置TCP缓冲区
.handler(new LoggingHandler(LogLevel.INFO))
// 初始化绑定服务通道
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws IOException {
ch.pipeline().addLast(
new NettyMessageDecoder(1024 * 1024, 4, 4));
ch.pipeline().addLast(new NettyMessageEncoder());
ch.pipeline().addLast("readTimeoutHandler",
new ReadTimeoutHandler(50));
ch.pipeline().addLast(new LoginAuthRespHandler());
ch.pipeline().addLast("HeartBeatHandler",
new HeartBeatRespHandler());
ch.pipeline().addLast(new ServerHandler());
}
});
// 绑定端口,同步等待成功
ChannelFuture cf = b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();
log.info("Netty server start ok : {} : {}",NettyConstant.REMOTEIP, NettyConstant.PORT);
//释放连接
cf.channel().closeFuture().sync();
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
测试
-
服务端
-
客户端
-
其他测试:服务端宕机重连,客户端宕机重连
源码下载
下载
参考
Netty权威指南
博客
最后
以上就是野性微笑为你收集整理的初识Netty六(自定义协议(私有协议)开发)的全部内容,希望文章能够帮你解决初识Netty六(自定义协议(私有协议)开发)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复