概述
实现多协议,多编解码器详解
- 前言
- netty中handler的执行顺序和条件(重要)
- 执行顺序
- 入栈handler介绍
- 出栈handler介绍
- 入栈handler处理器顺序
- 出栈handler处理器顺序
- 执行条件
- 编码演示
- 客户端
- 服务端
- 编解码器异常继续传递消息(扩展)
前言
先讲一下场景,我现在有一个需求,需要传递对象和字符串,其中对象要用protobuf来序列化进行通信,所以,这就产生了两个协议,一个字符串,一个protobuf,那么想要发送和接收这些消息,就需要具备字符串的编解码器和protobuf的编解码器。
当然,你可以说把对象序列化成json字符串传递,实现统一,这样也可以,但是不在本文讨论范围,其次,json传递效率是比较低的,还有就是全部转成json之后,在接收方进行解码判断的时候就比较麻烦了。
言归正传,接下来我们就开始正文讲解,为了防止粘包和拆包,我们需要再加一个分隔符的编解码器,这样下来,就会出现三个编解码器了,分隔符暂定用@。
netty中handler的执行顺序和条件(重要)
在正文开始之前,咱们先弄清楚handler执行顺序和条件,否则,接下来的内容,看似明白,实则还是无法融汇变通,举一反三,博主就是因为一开始没有先去理解这个概念,导致做了无数次试验,浪费了极多的时间。
执行顺序
handler执行顺序又分为入栈
和出栈
顺序,分别介绍一下,先提供一下接下来整个实践项目要用到的handler,方便下边讲解。
入栈handler介绍
凡是实现这个接口这就是入栈handler,比如我们常用的SimpleChannelInboundHandler、ChannelInboundHandlerAdapter等都是实现了ChannelInboundHandler
的,包括我们常用的解码器,比如StringDecoder、DelimiterBasedFrameDecoder等也都是实现了此接口。
出栈handler介绍
我们常用的编码器Encoder,比如StringEncoder等都是实现了此接口ChannelOutboundHandler
的。
总结就是,入栈handler就是专门用来拦截处理接收进来的消息,出栈handler就是专门用来拦截处理要发送出去的消息,例如我们常用的 ctx.writeAndFlush(“xxx”)就是要发送的消息,而各种编码器就是出栈handler。
入栈handler处理器顺序
为了方便讲解,我会将上述的一堆handler以出栈和入栈的类型拆分出来,进行图解,因为消息入栈是绝对不会经过出栈的handler的,反之亦然。
从上图可以看出来,入栈handler处理器顺序是从上到下,依次执行
,至于为什么有两个图,而且有的handler被跳过去了,先不用管,这属于handler的执行条件内容,后边会讲到。
出栈handler处理器顺序
从上边两个图可以看出,出栈顺序是按照从下往上,依次进行
的,至于为什么有两个图,这里就涉及到另一个知识点了:
1、
ctx.writeAndFlush
只会从当前的handler位置开始,往前找ChannelOutboundHandler执行;
2、ctx.pipeline().writeAndFlush
与ctx.channel().writeAndFlush
会从最开始的的位置,往前找ChannelOutboundHandler执行。
所以,用到本文的项目中,就会是这样的效果:
看完这个,可能大家就有疑问了,为什么消息走到ProtostuffEncoder就直接出去了,没有走上边的两个编码器呢,这就涉及到入栈和出栈执行条件的问题了。
执行条件
说明上边的问题之前,我们先来看一下handler发送的消息类型和各个编码器的具体实现。
handler发送的消息类型:
各编码器的实现:
注:String实现了CharSequence接口,所以泛型也属于String。
从上边四个图可以看到,最初发送的消息是ComputeResult对象
,那么显然,只会有泛型为ComputeResult的ProtostuffEncoder
编码器接收到了消息,其它两个因为泛型不匹配,所以无法接收消息,接着ProtostuffEncoder编码器进行一顿操作之后将消息以ByteBuf
的方式写出去,但是其它两个编码器还是因为泛型不匹配无法接收处理,所以最终出栈。
总结:无论是出栈handler,还是入栈handler,具体顺序怎么执行,是需要两个因素来决定,一个是当前handler所在的位置,二个就是当前handler的泛型,只有这两个条件同时满足,才会拦截到消息。
另外,格外提醒:一个handler接收到消息的泛型,一定是上个handler处理之后发送出来的泛型,而不是最初的handler发送的类型,就比如上边的案例,开始发送的是ComputeResult对象,经过ProtostuffEncoder编码器之后就变成了ByteBuf,如果后边有一个支持ByteBuf泛型的handler,那么就会拦截到这个消息了。
如果不标明泛型,那么泛型默认为object,比如这个解码器:
所以一般为了逻辑清晰,提高效率,防止被不需要的handler拦截,最好提前规定好每个handler的泛型,但是也不是一定,根据实际情况而定吧。
编码演示
protostuff自定义编码器参考上一篇文章:netty之Protostuff序列化协议。
由于经过
另外,对于protostuff解码器,请参考最下方扩展部分
,否则字符串消息无法传递到下一个string解码器。
客户端
/**
* @author: zhouwenjie
* @description: 客户端
* @create: 2020-04-03 17:14
**/
@Component
@Slf4j
public class NettyClient {
@Value("${monitor.server.host}")
private String host;
@Value("${monitor.server.port}")
private int port;
@Value("${monitor.delimiter}")
private String delimiter;
@Autowired
private NettyClientHandler nettyClientHandler;
private NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
private Bootstrap bootstrap;
@PostConstruct
public void run() throws UnknownHostException {
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.remoteAddress(host, port)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
//客户端初始化
socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024*8, Unpooled.wrappedBuffer(delimiter.getBytes())));
socketChannel.pipeline().addLast(new ProtostuffDecoder(ComputeResult.class));
socketChannel.pipeline().addLast(new ProtostuffEncoder(delimiter));
socketChannel.pipeline().addLast(nettyClientHandler);
}
});
String hostAddress = InetAddress.getLocalHost().getHostAddress();
// 指定本机ip端口,用来给服务端区分,指定端口,重启客户端会等两分钟才能连接上服务端
bootstrap.localAddress(hostAddress,0);
//连接netty服务器
reconnect();
}
/**
* 功能描述: 断线重连,客户端有断线重连机制,就更不能使用异步阻塞了
* @param
* @return void
* @author zhouwenjie
* @date 2021/3/19 14:53
*/
public void reconnect() {
ChannelFuture channelFuture = bootstrap.connect();
//使用最新的ChannelFuture -> 开启最新的监听器
channelFuture.addListener((ChannelFutureListener) future -> {
if (future.cause() != null) {
log.error("连接失败。。。");
future.channel().eventLoop().schedule(() -> reconnect(), 3, TimeUnit.SECONDS);
} else {
log.info("客户端连接成功。。。");
}
});
}
/**
* 关闭 client
*/
@PreDestroy
public void shutdown() {
// 优雅关闭 EventLoopGroup 对象
eventLoopGroup.shutdownGracefully();
log.info("[*Netty客户端关闭]");
}
}
服务端
NettyServerChannelInitializer
/**
* @author: zhouwenjie
* @description: 配置管道 服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器
* @create: 2020-04-03 14:14
**/
@Component
public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Value("${monitor.delimiter}")
private String delimiter;
@Autowired
private NettyServerHandler nettyServerHandler;
@Autowired
private HeartBeatServerHandler heartBeatServerHandler;
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
//解码器,拦截消息去掉分隔符
pipeline.addLast(new DelimiterBasedFrameDecoder(1024*8, Unpooled.wrappedBuffer(delimiter.getBytes())));
//解码器,反序列化Protostuff消息为实际对象
pipeline.addLast(new ProtostuffDecoder(ComputeResult.class));
//解码器,字符串解码
pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));
//编码器,字符串编码
pipeline.addLast(new StringEncoder(Charset.forName("UTF-8")));
//编码器,在原消息后添加分隔符
pipeline.addLast(new DelimiterBasedFrameEncoder(delimiter));
//心跳处理器,处理心跳字符串消息
pipeline.addLast(heartBeatServerHandler);
//编码器,将对象进行Protostuff序列化
pipeline.addLast(new ProtostuffEncoder(delimiter));
//核心处理器,处理连接和消息的业务hanlder
pipeline.addLast(nettyServerHandler);
}
DelimiterBasedFrameEncoder
public class DelimiterBasedFrameEncoder extends MessageToMessageEncoder<String> {
private String delimiter;
public DelimiterBasedFrameEncoder(String delimiter) {
this.delimiter = delimiter;
}
@Override
protected void encode(ChannelHandlerContext ctx, String msg, List<Object> out) {
// 在响应的数据后面添加分隔符
out.add(msg + delimiter);
}
}
NettyServer
/**
* @author: zhouwenjie
* @description: netty启动配置类
* @create: 2020-04-03 11:43
**/
@Slf4j
@Component
public class NettyServer {
@Autowired
private NettyServerChannelInitializer nettyServerChannelInitializer;
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workerGroup;
@PostConstruct
public void start() {
//创建接收请求和处理请求的实例(默认线程数为 CPU 核心数乘以2也可自定义)
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup(2);
try {
//创建服务端启动辅助类(boostrap 用来为 Netty 程序的启动组装配置一些必须要组件,例如上面的创建的两个线程组)
ServerBootstrap socketBs = new ServerBootstrap();
//channel 方法用于指定服务器端监听套接字通道
//socket配置
socketBs.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小
.option(ChannelOption.SO_BACKLOG, 1000)
.childHandler(nettyServerChannelInitializer);
//默认的心跳间隔是7200s即2小时。Netty默认关闭该功能。
//.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = socketBs.bind(8688).sync();
future.addListener(future1 -> log.info("Netty服务端启动成功"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@PreDestroy
public void shutdown() {
// 优雅关闭两个 EventLoopGroup 对象
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
log.info("[*Netty服务端关闭成功]");
}
}
HeartBeatServerHandler
/**
* @author: zhouwenjie
* @description: 心跳检测处理器
* @create: 2022-03-25 16:12
**/
@Slf4j
@Component
@ChannelHandler.Sharable
public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {
@Override
public void channelRead0(ChannelHandlerContext ctx, String msg) {
// 接收到心跳请求,打印心跳消息,否则进入下一处理流程
try {
if ("ping".equals(msg)) {
ctx.writeAndFlush("pong");
log.info("[server] Receive client heart beat message : ----> {}", msg);
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 功能描述: 心跳检测
*
* @param ctx 这里的作用主要是解决断网,弱网的情况发生
* @param evt
* @return void
* @author zhouwenjie
* @date 2020/4/3 17:02
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
String socketString = ctx.channel().remoteAddress().toString();
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
log.info("Client RSU: " + socketString + " READER_IDLE 读超时");
ctx.disconnect();
}
}
}
/**
* 在处理过程中引发异常时被调用
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("[server] heart response error: {}", cause.getMessage());
ctx.fireExceptionCaught(cause);
}
}
NettyServerHandler
/**
* @author: zhouwenjie
* @description: 服务端业务处理类
* @create: 2020-04-03 14:13
**/
@Slf4j
@Component
@ChannelHandler.Sharable
public class NettyServerHandler extends SimpleChannelInboundHandler<ComputeResult> {
public static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 连接成功
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("客户端连接");
clients.add(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.error("[*The netty server suspends service...]");
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
log.error("[* Netty connection exception]:{}", cause.toString());
cause.printStackTrace();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ComputeResult computeResult) throws Exception {
System.out.println(computeResult);
ctx.writeAndFlush(computeResult);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
String socketString = ctx.channel().remoteAddress().toString();
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
System.out.println("Client: " + socketString + " READER_IDLE 读超时");
ctx.disconnect();
}
}
}
}
好啦,这个明白了之后,根据自己的业务扩展起来就简单多了吧,只要把握好顺序和泛型,再复杂的编解码也能轻松搞定啦。
编解码器异常继续传递消息(扩展)
这里主要介绍,怎么在一个handler(编码或者解码)失败后,将消息传递到下一个编解码器。
/**
* @author: zhouwenjie
* @description:
* @create: 2022-07-12 11:17
**/
public class ProtostuffDecoder<T> extends ByteToMessageDecoder {
private Class<T> clazz;
public ProtostuffDecoder(Class<T> clazz) {
this.clazz = clazz;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
try {
byte[] body = new byte[in.readableBytes()]; //传输正常
in.readBytes(body);
out.add(ProtostuffUtils.deserialize(body, clazz));
} catch (Exception e) {
e.printStackTrace();
}
}
}
如上代码,如果在 out.add(ProtostuffUtils.deserialize(body, clazz));
代码出错,怎么才能让消息继续传递给下一个解码器呢?
加下边的代码即可:
/**
* @author: zhouwenjie
* @description:
* @create: 2022-07-12 11:17
**/
public class ProtostuffDecoder<T> extends MessageToMessageDecoder<ByteBuf>{
private Class<T> clazz;
public ProtostuffDecoder(Class<T> clazz) {
this.clazz = clazz;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
try {
byte[] body = new byte[in.readableBytes()]; //传输正常
in.readBytes(body);
out.add(ProtostuffUtils.deserialize(body, clazz));
} catch (Exception e) {
// 重置读取字节索引,因为上边已经读了(readBytes),不加这个会导致数据为空
in.resetReaderIndex();
// 这里是复制流,复制一份,防止skipBytes跳过,导致传递的消息变成空;
//同时还解决引用计数器为0的异常:refCnt: 0, decrement: 1。
ByteBuf buffer = in.retainedDuplicate();
//解决 decode() did not read anything but decoded a message的异常
//原因是netty不允许有字节内容不读的情况发生,所以采用下边的方法解决。
in.skipBytes(in.readableBytes());
out.add(buffer);
}
}
}
最后
以上就是老实信封为你收集整理的netty实现多协议,多编解码器前言netty中handler的执行顺序和条件(重要)编码演示编解码器异常继续传递消息(扩展)的全部内容,希望文章能够帮你解决netty实现多协议,多编解码器前言netty中handler的执行顺序和条件(重要)编码演示编解码器异常继续传递消息(扩展)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复