概述
私有协议介绍
由于现代软件的复杂性,一个大型软件系统往往会被人为地拆分称为多个模块,另外随着移动互联网的兴起,网站的规模越来越大,业务功能越来越多,往往需要集群和分布式部署。模块之间的通信就需要进行跨节点通信。
传统的Java应用中节点通信的常用方式:
- rmi远程服务调用
- Java Socket + Java序列化
- RPC框架 Thrift、Apache的Avro等
- 利用标准的公有协议进行跨节点调用,例如HTTP+XML,Restful+JSON或WebService
下面使用Netty设计私有协议
除了链路层的物理连接外,还需要对请求和响应消息进行编解码。 在请求和应答之外,还需要控制和管理类指令,例如链路建立的握手信息,链路检测的心跳信息。这些功能组合到一起后,就会形成私有协议。
- 每个Netty节点(Netty进程)之间建立长连接,使用Netty协议进行通信。
- Netty节点没有客户端和服务端的区别,谁首先发起连接,谁就是客户端。
网络拓扑图:
协议栈功能描述:
- 基于Netty的NIO通信框架,提供高性能的异步通信能力;
- 提供消息的编解码框架,实现POJO的序列化和反序列化
- 提供基于IP地址的白名单接入认证机制;
- 链路的有效性校验机制;
- 链路的断线重连机制;
通信模型
具体步骤:
- Netty协议栈客户端发送握手请求信息,携带节点ID等有效身份认证信息;
- Netty协议服务端对握手请求消息进行合法性校验,包括节点ID有效性校验、节点重复登录校验和IP地址合法性校验,校验通过后,返回登录成功的握手应答消息;
- 链路建立成功之后,客户端发送业务消息;
- 链路成功之后,服务端发送心跳消息;
- 链路建立成功之后,客户端发送心跳消息;
- 链路建立成功之后,服务端发送业务消息;
- 服务端退出时,服务端关闭连接,客户端感知对方关闭连接后,被动关闭客户端连接。
消息定义
类似于http协议,消息分为消息头和消息体。其中消息体是一个Object类型,消息头则如下所示:
名称 | 类型 | 长度 | 描述 |
length | 整型 int | 32 | 消息长度,整个消息,包括消息头和消息体 |
sessionId | 长整型long | 64 | 集群节点内全局唯一,由会话ID生成器生成 |
type | Byte | 8 | 0: 表示请求消息 1: 业务响应消息 2: 业务ONE WAY消息(即是请求又是响应消息) 3: 握手请求消息 4: 握手应答消息 5: 心跳请求消息 6: 心跳应答消息 |
priority | Byte | 8 | 消息优先级: 0-255 |
支持的字段类型
Netty协议栈开发
数据结构定义
消息头:
package com.example.netty.custom.struct;
/**
* 消息协议头
* @author lanx
* @date 2022/3/25
*/
public class Header {
private int crcCode = 0xadaf123; //唯一通信标识
private int length;//总消息的长度
private long sessionId;//会话id
private byte type;//消息类型
private byte priority;//消息优先级
public int getCrcCode() {
return crcCode;
}
public void setCrcCode(int crcCode) {
this.crcCode = crcCode;
}
public int getLength() {
return length;
}
public void setLength(int length) {
this.length = length;
}
public long getSessionId() {
return sessionId;
}
public void setSessionId(long sessionId) {
this.sessionId = sessionId;
}
public byte getType() {
return type;
}
public void setType(byte type) {
this.type = type;
}
public byte getPriority() {
return priority;
}
public void setPriority(byte priority) {
this.priority = priority;
}
}
package com.example.netty.custom.struct;
/**
*消息协议
* @author lanx
* @date 2022/3/25
*/
public final class NettyMessage {
private Header header;
private Object body;
public final Header getHeader() {
return header;
}
public final void setHeader(Header header) {
this.header = header;
}
public final Object getBody() {
return body;
}
public final void setBody(Object body) {
this.body = body;
}
}
消息编解码
由于依赖于JBoss Marshalling...,添加maven依赖
<!--netty依赖-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.20.Final</version>
</dependency>
<!-- marshalling -->
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>2.0.0.Beta2</version>
</dependency>
Marshalling 工厂类
package com.example.netty.custom.codec;
import org.jboss.marshalling.*;
import java.io.IOException;
/**
* Marshalling 工厂类
*
* @author lanx
* @date 2022/3/22
*/
public final class MarshallingCodecFactory {
/**
* 创建Jboss Marshaller
*
* @return
* @throws IOException
*/
public static Marshaller buildMarshalling() throws IOException {
//首先通过Marshalling 工具类的静态方法获取Marshalling实例对象,参数serial标识创建的是java序列化工厂对象
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
//创建 MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
Marshaller marshaller = marshallerFactory.createMarshaller(configuration);
return marshaller;
}
/**
* 创建Jboss Unmarshaller
*
* @return
* @throws IOException
*/
public static Unmarshaller buildUnMarshalling() throws IOException {
//首先通过Marshalling 工具类的静态方法获取Marshalling实例对象,参数serial标识创建的是java序列化工厂对象
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
//创建 MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(configuration);
return unmarshaller;
}
}
增加JBossMarshalling序列化对象->ByteBuf工具
package com.example.netty.custom.codec;
import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.Marshaller;
import java.io.IOException;
/**
*
* 编码
* JBossMarshalling序列化对象->ByteBuf工具
*
* @author lanx
* @date 2022/3/26
*/
public class MyMarshallingEncoder {
// 占位数组 存储body数据长度
private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
private Marshaller marshaller;
public MyMarshallingEncoder() throws IOException {
this.marshaller = MarshallingCodecFactory.buildMarshalling();
}
// 使用marshall对Object进行编码,并且写入bytebuf...
public void encode(Object body, ByteBuf out) throws IOException {
try {
//1. 获取写入位置 数据起始位置
int lengthPos = out.writerIndex();
//2. 先写入4个bytes,用于记录Object对象编码后长度
out.writeBytes(LENGTH_PLACEHOLDER);
//3. 使用代理对象,防止marshaller写完之后关闭byte buf
ChannelBufferByteOutput output = new ChannelBufferByteOutput(out);
//4. 开始使用marshaller往bytebuf中编码
marshaller.start(output);
marshaller.writeObject(body);
//5. 结束编码
marshaller.finish();
//6. 设置对象长度 || 总长度 - 初始化长度(起始位置) - 预留长度 = body数据长度
out.setInt(lengthPos, out.writerIndex() - lengthPos - 4);
} catch (IOException e) {
e.printStackTrace();
} finally {
marshaller.close();
}
}
}
package com.example.netty.custom.codec;
import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.ByteOutput;
import java.io.IOException;
/**
* 输出字节流工具
* @author lanx
* @date 2022/3/26
*/
public class ChannelBufferByteOutput implements ByteOutput {
private final ByteBuf buffer;
/**
* Create a new instance which use the given {@link ByteBuf}
*/
public ChannelBufferByteOutput(ByteBuf buffer) {
this.buffer = buffer;
}
@Override
public void close() throws IOException {
// Nothing to do
}
@Override
public void flush() throws IOException {
// nothing to do
}
@Override
public void write(int b) throws IOException {
buffer.writeByte(b);
}
@Override
public void write(byte[] bytes) throws IOException {
buffer.writeBytes(bytes);
}
@Override
public void write(byte[] bytes, int srcIndex, int length) throws IOException {
buffer.writeBytes(bytes, srcIndex, length);
}
/**
* Return the {@link ByteBuf} which contains the written content
*/
ByteBuf getBuffer() {
return buffer;
}
}
增加JBossMarshalling反序列化对象<-ByteBuf工具
package com.example.netty.custom.codec;
import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.Unmarshaller;
import java.io.IOException;
/**
* 解码
* JBossMarshalling反序列化对象<-ByteBuf工具
*
* @author lanx
* @date 2022/3/26
*/
public class MyMarshallingDecoder {
private final Unmarshaller unmarshaller;
public MyMarshallingDecoder() throws IOException {
unmarshaller = MarshallingCodecFactory.buildUnMarshalling();
}
protected Object decode(ByteBuf in) throws Exception {
//1. 读取第一个4bytes,里面放置的是object对象的byte长度
int objectSize = in.readInt();
// 从当前可读取 读取到object对象的byte长度(获取实际body内容长度)
ByteBuf buf = in.slice(in.readerIndex(), objectSize);
//2 . 使用bytebuf的代理类
ChannelBufferByteInput input = new ChannelBufferByteInput(buf);
try {
//3. 开始解码
unmarshaller.start(input);
Object obj = unmarshaller.readObject();
unmarshaller.finish();
//4. 读完之后设置读取的位置(更新起始位置) :因为使用slice 读取数据只是复制了一段数据,读取完成后读取位置没有修改缓冲器可读取位置,
// 使用之后需要设置当前已经读取到的位置 便于下次数据的读取:
in.readerIndex(in.readerIndex() + objectSize);
return obj;
} finally {
unmarshaller.close();
}
}
}
package com.example.netty.custom.codec;
import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.ByteInput;
import java.io.IOException;
/**
* 输入字节流 工具
* @author lanx
* @date 2022/3/26
*/
public class ChannelBufferByteInput implements ByteInput {
private final ByteBuf buffer;
public ChannelBufferByteInput(ByteBuf buffer) {
this.buffer = buffer;
}
@Override
public void close() throws IOException {
// nothing to do
}
@Override
public int available() throws IOException {
return buffer.readableBytes();
}
@Override
public int read() throws IOException {
if (buffer.isReadable()) {
return buffer.readByte() & 0xff;
}
return -1;
}
@Override
public int read(byte[] array) throws IOException {
return read(array, 0, array.length);
}
@Override
public int read(byte[] dst, int dstIndex, int length) throws IOException {
int available = available();
if (available == 0) {
return -1;
}
length = Math.min(available, length);
buffer.readBytes(dst, dstIndex, length);
return length;
}
@Override
public long skip(long bytes) throws IOException {
int readable = buffer.readableBytes();
if (readable < bytes) {
bytes = readable;
}
buffer.readerIndex((int) (buffer.readerIndex() + bytes));
return bytes;
}
}
下面根据上述所说的进行对消息编解码:
package com.example.netty.custom.codec;
import com.example.netty.custom.struct.NettyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.io.IOException;
/**
* 编码器
*
* @author lanx
* @date 2022/3/25
*/
public class NettyMessageEncoder extends MessageToByteEncoder<NettyMessage> {
private MyMarshallingEncoder myMarshallingEncoder;
public NettyMessageEncoder() throws IOException {
this.myMarshallingEncoder = new MyMarshallingEncoder();
}
@Override
protected void encode(ChannelHandlerContext ctx, NettyMessage msg, ByteBuf out) throws Exception {
//Head 信息
out.writeInt(msg.getHeader().getCrcCode());
out.writeInt(msg.getHeader().getLength());
out.writeLong(msg.getHeader().getSessionId());
out.writeByte(msg.getHeader().getType());
out.writeByte(msg.getHeader().getPriority());
//Body 信息
Object body = msg.getBody();
if (body != null) {
//使用 MyMarshallingEncoder 编码
myMarshallingEncoder.encode(body, out);
} else {
out.writeInt(0);
}
// 之前写了crcCode 4bytes,除去crcCode和length 8bytes即为更新之后的字节
out.setInt(4, out.readableBytes() - 8);
}
}
package com.example.netty.custom.codec;
import com.example.netty.custom.struct.Header;
import com.example.netty.custom.struct.NettyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import java.io.IOException;
/**
* 解码器
* LengthFieldBasedFrameDecoder 为了解决拆包粘包问题
*
* @author lanx
* @date 2022/3/25
*/
public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {
private MyMarshallingDecoder myMarshallingDecoder;
public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset,
int lengthFieldLength) throws IOException {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
this.myMarshallingDecoder = new MyMarshallingDecoder();
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//调用父类(LengthFieldBasedFrameDecoder)方法
ByteBuf frame = (ByteBuf) super.decode(ctx, in);
if (frame == null) {
return null;
}
NettyMessage message = new NettyMessage();
Header header = new Header();
header.setCrcCode(frame.readInt());
header.setLength(frame.readInt());
header.setSessionId(frame.readLong());
header.setType(frame.readByte());
header.setPriority(frame.readByte());
if (frame.readableBytes() > 4) {
message.setBody(myMarshallingDecoder.decode(frame));
}
message.setHeader(header);
return message;
}
}
关键在于解码器继承了LengthFieldBasedFrameDecoder,三个参数:
ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));
第一个参数:1024*1024: 最大长度
第二个参数: 从第4个bytes开始表示是长度
第三个参数: 有4个bytes的长度表示是长度
客户端代码
package com.example.netty.custom.client;
import com.example.netty.custom.codec.NettyMessageDecoder;
import com.example.netty.custom.codec.NettyMessageEncoder;
import com.example.netty.custom.domain.ResqData;
import com.example.netty.custom.struct.Header;
import com.example.netty.custom.struct.NettyMessage;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.UUID;
/**
* netty 客户端
*
* @author lanx
* @date 2022/3/20
*/
public class Client {
public static void main(String[] args) throws Exception {
//线程工作组
EventLoopGroup workerGroup = new NioEventLoopGroup();
//辅助类 帮我我们创建netty服务
Bootstrap b = new Bootstrap();
b.group(workerGroup)//绑定两个工作组
.channel(NioSocketChannel.class)//设置NIO模式
//初始化绑定服务通道
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//序列化
sc.pipeline().addLast(new NettyMessageDecoder(1024 * 1025 * 5, 4, 4));
sc.pipeline().addLast(new NettyMessageEncoder());
//为通道进行初始化:数据传输过来的时候会进行拦截和执行 (可以有多个拦截器)
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture cf = b.connect("127.0.0.1", 8765).syncUninterruptibly();
for (int i = 0; i < 1000000; i++) {
NettyMessage message = new NettyMessage();
Header header = new Header();
header.setSessionId(1000L);
header.setType((byte) 1);
header.setPriority((byte) 1);
message.setHeader(header);
ResqData resqData = new ResqData();
resqData.setId(UUID.randomUUID().toString());
resqData.setData("test-" + i);
message.setBody(resqData);
cf.channel().writeAndFlush(message);
}
//释放连接
cf.channel().closeFuture().sync();
workerGroup.shutdownGracefully();
}
}
package com.example.netty.custom.client;
import com.alibaba.fastjson.JSON;
import com.example.netty.custom.struct.NettyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
/**
* 客户端 监听器
*
* @author lanx
* @date 2022/3/20
*/
public class ClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当我们的通道被激活的时候触发的监听
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("--------客户端通道激活---------");
}
/**
* 当我们通道里有数据进行读取的时候触发的监听
*
* @param ctx netty服务上下文
* @param msg 实际传输的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
NettyMessage message = (NettyMessage)msg;
System.out.println("客户端:获取服务端响应数据-->"+ JSON.toJSONString(message));
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放数据 (如果你读取数据后又写出去数据就不需要调用此方法,因为底层代码帮忙实现额释放数据)
ReferenceCountUtil.release(msg);
}
}
/**
* 当我们读取完成数据的时候触发的监听
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("--------客户端数据读取完毕---------");
}
/**
* 当我们读取数据异常的时候触发的监听
*
* @param ctx
* @param cause
* @throws Exception
*/
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("--------客户端数据读取异常---------");
cause.printStackTrace();
ctx.close();
}
}
服务端
package com.example.netty.custom.server;
import com.example.netty.custom.codec.NettyMessageDecoder;
import com.example.netty.custom.codec.NettyMessageEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* netty 服务端
*
* @author lanx
* @date 2022/3/20
*/
public class Server {
public static void main(String[] args) throws InterruptedException {
//用户接收客户端连接的线程工作组
EventLoopGroup bossGroup = new NioEventLoopGroup();
//用于接收客户端连接读写操作的线程组
EventLoopGroup workerGroup = new NioEventLoopGroup();
//辅助类 帮我我们创建netty服务
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//绑定两个工作组
.channel(NioServerSocketChannel.class)//设置NIO模式
//option 针对于服务端配置; childOption 针对于客户端连接通道配置
.option(ChannelOption.SO_BACKLOG, 1024)//设置tcp缓冲区
.childOption(ChannelOption.SO_SNDBUF, 32 * 1024)//设置发送数据的缓存大小
.childOption(ChannelOption.SO_RCVBUF, 32 * 1024)//设置读取数据的缓存大小
.childOption(ChannelOption.SO_KEEPALIVE, true)//设置保持长连接
//初始化绑定服务通道
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
//序列化
sc.pipeline().addLast(new NettyMessageDecoder(1024*1025*5,4,4));
sc.pipeline().addLast(new NettyMessageEncoder());
//为通道进行初始化:数据传输过来的时候会进行拦截和执行 (可以有多个拦截器)
sc.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture cf = b.bind(8765).sync();
//释放连接
cf.channel().closeFuture().sync();
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
package com.example.netty.custom.server;
import com.alibaba.fastjson.JSON;
import com.example.netty.custom.domain.ResqData;
import com.example.netty.custom.struct.Header;
import com.example.netty.custom.struct.NettyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import java.util.UUID;
/**
* 服务端 监听器
*
* @author lanx
* @date 2022/3/20
*/
public class ServerHandler extends ChannelInboundHandlerAdapter {
/**
* 当我们的通道被激活的时候触发的监听
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("--------服务端通道激活---------");
}
/**
* 当我们通道里有数据进行读取的时候触发的监听
*
* @param ctx netty服务上下文
* @param msg 实际传输的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
NettyMessage message = (NettyMessage)msg;
System.out.println("服务端:获取客户端数据-->"+ JSON.toJSONString(message));
NettyMessage respMessage = new NettyMessage();
Header header = new Header();
header.setSessionId(1000L);
header.setType((byte) 1);
header.setPriority((byte) 1);
respMessage.setHeader(header);
ResqData resqData = new ResqData();
resqData.setId(UUID.randomUUID().toString());
resqData.setData("test-0");
respMessage.setBody(resqData);
//响应给客户端的数据
ctx.writeAndFlush(respMessage);
}catch (Exception e){
e.printStackTrace();
}finally {
//释放数据
ReferenceCountUtil.release(msg);
}
}
/**
* 当我们读取完成数据的时候触发的监听
*
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("--------服务端数据读取完毕---------");
}
/**
* 当我们读取数据异常的时候触发的监听
*
* @param ctx
* @param cause
* @throws Exception
*/
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("--------服务端数据读取异常---------");
cause.printStackTrace();
ctx.close();
}
}
传输对象body
package com.example.netty.custom.domain;
import java.io.Serializable;
/**
* 请求数据
* @author lanx
* @date 2022/3/22
*/
public class ResqData implements Serializable {
private static final long serialVersionUID = 8241970228716425282L;
private String id;
private String data;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
}
package com.example.netty.custom.domain;
import java.io.Serializable;
/**
* 响应数据
* @author lanx
* @date 2022/3/22
*/
public class RespData implements Serializable {
private static final long serialVersionUID = 8241970228716425282L;
private String id;
private String data;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
}
最后
以上就是自信香水为你收集整理的Netty 私有协议栈私有协议介绍Netty协议栈开发的全部内容,希望文章能够帮你解决Netty 私有协议栈私有协议介绍Netty协议栈开发所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复