我是靠谱客的博主 专一音响,这篇文章主要介绍Netty(3)心跳检测、WebSocket、Protobuf一 心跳检测二 实现WebSocket服务器三 Protobuf参考,现在分享给大家,希望可以做个参考。

一 心跳检测

当连接数很大时,我们想要释放部分空闲连接;或者连接已经断了但服务端没检测到。我们都可以使用Netty自动的心跳检测类IdleStateHandler来实现

1.1 类介绍

IdleStateHandler 是netty 提供的处理空闲状态的处理器.我们一般直接在ChannelInitializer中配置即可。该类会当channel一段时间没有执行读或写或读写时,触发一个IdleStateEvent.这个事件会传递给管道 的下一个handler去处理:通过调用(触发)下一个handler 的userEventTiggered 方法, 我们在该方法中去处理IdleStateEvent(读空闲,写空闲,读写空闲),实现自己的逻辑

构造方法参数说明

  1. long readerIdleTime : 表示多长时间没有读, 就会发送一个心跳检测包检测是否连接
  2. long writerIdleTime : 表示多长时间没有写, 就会发送一个心跳检测包检测是否连接
  3. long allIdleTime : 表示多长时间没有读写, 就会发送一个心跳检测包检测是否连接

1.2 启动类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public static void main(String[] args) throws Exception{ //创建两个线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //加入一个netty 提供 IdleStateHandler pipeline.addLast(new IdleStateHandler(7,20,30, TimeUnit.SECONDS)); //加入一个对空闲检测进一步处理的handler(自定义) pipeline.addLast(new MyServerHandler()); } }); //启动服务器 ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }

1.3 自定义Handler

我们可以在这里做日志、或者提醒、或者直接释放资源。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class MyServerHandler extends ChannelInboundHandlerAdapter { /** * * @param ctx 上下文 * @param evt 事件 * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent) { //将 evt 向下转型 IdleStateEvent IdleStateEvent event = (IdleStateEvent) evt; String eventType = null; switch (event.state()) { case READER_IDLE: eventType = "读空闲"; break; case WRITER_IDLE: eventType = "写空闲"; break; case ALL_IDLE: eventType = "读写空闲"; break; } System.out.println(ctx.channel().remoteAddress() + "--超时时间--" + eventType); System.out.println("服务器做相应处理.."); //如果发生空闲,我们关闭通道 // ctx.channel().close(); } } }

1.4 测试

在这里插入图片描述

二 实现WebSocket服务器

2.1 启动类

这里面用到了多个netty自动的handler,也会在后续说明

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public static void main(String[] args) throws Exception{ //创建两个线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //因为基于http协议,使用http的编码和解码器 pipeline.addLast(new HttpServerCodec()); //是以块方式写,添加ChunkedWriteHandler处理器 pipeline.addLast(new ChunkedWriteHandler()); /* 说明 1. http数据在传输过程中是分段, HttpObjectAggregator ,就是可以将多个段聚合 2. 这就就是为什么,当浏览器发送大量数据时,就会发出多次http请求 */ pipeline.addLast(new HttpObjectAggregator(8192)); /* 说明 1. 对应websocket ,它的数据是以 帧(frame) 形式传递 2. 可以看到WebSocketFrame 下面有六个子类 3. 浏览器请求时 ws://localhost:7000/hello 表示请求的uri 4. WebSocketServerProtocolHandler 核心功能是将 http协议升级为 ws协议 , 保持长连接 5. 是通过一个 状态码 101 */ pipeline.addLast(new WebSocketServerProtocolHandler("/hello2")); //自定义的handler ,处理业务逻辑 pipeline.addLast(new MyTextWebSocketFrameHandler()); } }); //启动服务器 ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }

HttpServerCodec

如果服务器会用到http协议,可以添加此编码器。会帮我们解析http请求,生成http响应

可以看到本质还是HttpServerRequestDecoder和HttpServerResponseEncoder在工作

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
public HttpServerCodec() { this(4096, 8192, 8192); } /** * Creates a new instance with the specified decoder options. */ public HttpServerCodec(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) { init(new HttpServerRequestDecoder(maxInitialLineLength, maxHeaderSize, maxChunkSize), new HttpServerResponseEncoder()); }

ChunkedWriteHandler

该通道处理器主要是为了处理大文件传输的情形。大文件传输时,需要复杂的状态管理,而ChunkedWriteHandler实现这个功能

参考:http协议之chunked

HttpObjectAggregator

它负责把多个HttpMessage组装成一个完整的Http请求或者响应。到底是组装成请求还是响应,则取决于它所处理的内容是请求的内容,还是响应的内容。这其实可以通过Inbound和Outbound来判断,对于Server端而言,在Inbound 端接收请求,在Outbound端返回响应。

如果Server向Client返回的数据指定的传输编码是chunked。则,Server不需要知道发送给Client的数据总长度是多少,它是通过分块发送的,参考分块传输编码

注意,HttpObjectAggregator通道处理器必须放到HttpRequestDecoder或者HttpRequestEncoder后面

参考:https://www.cnblogs.com/hapjin/p/5364416.html

WebSocketServerProtocolHandler

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/** * This handler does all the heavy lifting for you to run a websocket server. * * It takes care of websocket handshaking as well as processing of control frames (Close, Ping, Pong). Text and Binary * data frames are passed to the next handler in the pipeline (implemented by you) for processing. * * See <tt>io.netty.example.http.websocketx.html5.WebSocketServer</tt> for usage. * * The implementation of this handler assumes that you just want to run a websocket server and not process other types * HTTP requests (like GET and POST). If you wish to support both HTTP requests and websockets in the one server, refer * to the <tt>io.netty.example.http.websocketx.server.WebSocketServer</tt> example. * * To know once a handshake was done you can intercept the * {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)} and check if the event was instance * of {@link HandshakeComplete}, the event will contain extra information about the handshake such as the request and * selected subprotocol. */

这个处理程序为您完成运行websocket服务器的所有繁重工作。
它负责处理websocket握手以及控制帧(关闭、Ping、Pong)的处理。文本和二进制数据帧被传递到管道中的下一个处理程序(由您实现)进行处理。
此处理程序的实现假定您只想运行Websocket服务器,而不处理其他类型的HTTP请求(如GET和POST)。如果您希望在一台服务器上同时支持HTTP请求和Websocket,请参见io.netty.example.http.websocketx.server.WebSocketServer示例

2.2 自定义Handler

注意类声明时的类型TextWebSocketFrame,读写数据时都要使用该类型

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.atguigu.netty.websocket; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import java.time.LocalDateTime; //这里 TextWebSocketFrame 类型,表示一个文本帧(frame) public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { System.out.println("服务器收到消息 " + msg.text()); //回复消息 ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间" + LocalDateTime.now() + " " + msg.text())); } //当web客户端连接后, 触发方法 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //id 表示唯一的值,LongText 是唯一的 ShortText 不是唯一 System.out.println("handlerAdded 被调用" + ctx.channel().id().asLongText()); System.out.println("handlerAdded 被调用" + ctx.channel().id().asShortText()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerRemoved 被调用" + ctx.channel().id().asLongText()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("异常发生 " + cause.getMessage()); ctx.close(); //关闭连接 } }

2.3 测试

找一个在线WebSocket测试网站
在这里插入图片描述

三 Protobuf

3.1 网络传输的都是二级制

之前我们写的demo,收发看到的都是字符串。但实际上在网络传输时,这些都变成了二级制。

因此在编写网络应用程序时。在发送数据时需要编码,接收数据时需要解码
在这里插入图片描述

codec(编解码器)的组成部分有两个:decoder(解码器)和 encoder(编码器)。encoder 负责把业务数据转换成字节码数据,decoder 负责把字节码数据转换成业务数据

3.2 Netty自带的编解码器

Netty 提供的编码器

  • StringEncoder,对字符串数据进行编码
  • ObjectEncoder,对 Java 对象进行编码
  • 等等

Netty 提供的解码器

  • StringDecoder, 对字符串数据进行解码
  • ObjectDecoder,对 Java 对象进行解码
  • 等等

java序列化的缺陷

Netty 本身自带的 ObjectDecoder 和 ObjectEncoder 可以用来实现 POJO 对象或各种业务对象的编码和解码,底层使用的仍是Java 序列化技术 , 而Java 序列化技术本身效率就不高,存在如下问题

  • 无法跨语言
  • 序列化后的体积太大,是二进制编码的 5 倍多。
  • 序列化性能太低

解决方案就是下面的Protobuf

3.3 Protobuf

Protobuf 是 Google 发布的开源项目,全称 Google Protocol Buffers,是一种轻便高效的结构化数据存储格式,可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或 RPC[远程过程调用 remote procedure call ] 数据交换格式
官网

Protobuf 是以message的方式来管理数据的.

支持跨平台、跨语言,即[客户端和服务器端可以是不同的语言编写的] (支持目前绝大多数语言,例如 C++、C#、Java、python 等)

Protobbuf的知识点比较多,我也还没仔细看过。之后再去研究,现在先继续看如何整合

编写一个proto文件

为了跨语言,protobuf有一套自己的语法。我们编写一个对应的文件‘

注意点

  1. 枚举类型中的下标从0开始
  2. 普通message的属性下标从1开始
  3. oneof 表示只能出现其中的一个
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
syntax = "proto3"; option optimize_for = SPEED; // 加快解析 option java_package="com.atguigu.netty.codec2"; //指定生成到哪个包下 option java_outer_classname="MyDataInfo"; // 外部类名, 文件名 //protobuf 可以使用message 管理其他的message message MyMessage { //定义一个枚举类型 enum DataType { StudentType = 0; //在proto3 要求enum的编号从0开始 WorkerType = 1; } //用data_type 来标识传的是哪一个枚举类型 DataType data_type = 1; //表示每次枚举类型最多只能出现其中的一个, 节省空间 oneof dataBody { Student student = 2; Worker worker = 3; } } message Student { int32 id = 1;//Student类的属性 string name = 2; // } message Worker { string name=1; int32 age=2; }

生成java文件

复制代码
1
2
protoc.exe Student.proto --java_out=./

在这里插入图片描述
可以把生成的java文件复制到需要的包下(这里肯定不是最佳实践,因为现在还不熟悉protobuf

3.4 编写服务端

启动类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public static void main(String[] args) throws Exception { //创建BossGroup 和 WorkerGroup //说明 //1. 创建两个线程组 bossGroup 和 workerGroup //2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成 //3. 两个都是无限循环 //4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数 // 默认实际 cpu核数 * 2 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8 try { //创建服务器端的启动对象,配置参数 ServerBootstrap bootstrap = new ServerBootstrap(); //使用链式编程来进行设置 bootstrap.group(bossGroup, workerGroup) //设置两个线程组 .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现 .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数 .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态 // .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象) //给pipeline 设置处理器 @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //在pipeline加入ProtoBufDecoder //指定对哪种对象进行解码 pipeline.addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance())); pipeline.addLast(new NettyServerHandler()); } }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器 System.out.println(".....服务器 is ready..."); //绑定一个端口并且同步, 生成了一个 ChannelFuture 对象 //启动服务器(并绑定端口) ChannelFuture cf = bootstrap.bind(6668).sync(); //给cf 注册监听器,监控我们关心的事件 cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (cf.isSuccess()) { System.out.println("监听端口 6668 成功"); } else { System.out.println("监听端口 6668 失败"); } } }); //对关闭通道进行监听 cf.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }

自定义handler

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/* 说明 1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范) 2. 这时我们自定义一个Handler , 才能称为一个handler */ //public class NettyServerHandler extends ChannelInboundHandlerAdapter { public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> { //读取数据实际(这里我们可以读取客户端发送的消息) /* 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址 2. Object msg: 就是客户端发送的数据 默认Object */ @Override public void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception { //根据dataType 来显示不同的信息 MyDataInfo.MyMessage.DataType dataType = msg.getDataType(); if(dataType == MyDataInfo.MyMessage.DataType.StudentType) { MyDataInfo.Student student = msg.getStudent(); System.out.println("学生id=" + student.getId() + " 学生名字=" + student.getName()); } else if(dataType == MyDataInfo.MyMessage.DataType.WorkerType) { MyDataInfo.Worker worker = msg.getWorker(); System.out.println("工人的名字=" + worker.getName() + " 年龄=" + worker.getAge()); } else { System.out.println("传输的类型不正确"); } } //数据读取完毕 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //writeAndFlush 是 write + flush //将数据写入到缓存,并刷新 //一般讲,我们对这个发送的数据进行编码 ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8)); } //处理异常, 一般是需要关闭通道 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }

3.5 客户端

启动类

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class NettyClient { public static void main(String[] args) throws Exception { //客户端需要一个事件循环组 EventLoopGroup group = new NioEventLoopGroup(); try { //创建客户端启动对象 //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap Bootstrap bootstrap = new Bootstrap(); //设置相关参数 bootstrap.group(group) //设置线程组 .channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //在pipeline中加入 ProtoBufEncoder pipeline.addLast("encoder", new ProtobufEncoder()); pipeline.addLast(new NettyClientHandler()); //加入自己的处理器 } }); System.out.println("客户端 ok.."); //启动客户端去连接服务器端 //关于 ChannelFuture 要分析,涉及到netty的异步模型 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync(); //给关闭通道进行监听 channelFuture.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); } } }

自定义Handler

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class NettyClientHandler extends ChannelInboundHandlerAdapter { //当通道就绪就会触发该方法 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //随机的发送Student 或者 Workder 对象 int random = new Random().nextInt(3); MyDataInfo.MyMessage myMessage = null; if(0 == random) { //发送Student 对象 myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType).setStudent(MyDataInfo.Student.newBuilder().setId(5).setName("玉麒麟 卢俊义").build()).build(); } else { // 发送一个Worker 对象 myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType).setWorker(MyDataInfo.Worker.newBuilder().setAge(20).setName("老李").build()).build(); } ctx.writeAndFlush(myMessage); } //当通道有读取事件时,会触发 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("服务器的地址: "+ ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

3.6 测试

在这里插入图片描述

参考

本文许多素材来自教程,比较推荐入门学习

  1. 尚硅谷韩顺平Netty视频教程

最后

以上就是专一音响最近收集整理的关于Netty(3)心跳检测、WebSocket、Protobuf一 心跳检测二 实现WebSocket服务器三 Protobuf参考的全部内容,更多相关Netty(3)心跳检测、WebSocket、Protobuf一内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(81)

评论列表共有 0 条评论

立即
投稿
返回
顶部