概述
文章目录
- 1. Netty简介
- 1.1.JDK原生NIO程序的问题
- 1.2.Netty的特点
- 1.3.Netty 常见的使用场景
- 1.4.Netty 高性能设计
- 1.4.1.I/O模型
- 1.4.1.1.BIO(Blocking IO): 传统的IO/阻塞式IO
- 1.4.1.2.NIO(Non-Blocking IO):新IO/非阻塞式IO
- 1.4.1.2.1.I/O复用模型
- 1.4.1.2.2.NIO的Selector模型
- 1.4.2.线程处理模型
- 1.4.2.1.事件驱动模型
- 1.4.2.2.Reactor线程模型
- 1.5.Netty的线程模型
- 1.5.1.简介
- 1.5.2.主从Reactor多线程模型
- 1.5.3.异步处理
- 1.5.3.1.简介
- 1.5.3.2.异步处理案例
- 1.6.Netty 架构设计
- 1.6.1.功能特性
- 1.6.2.组件模块
- 1.6.2.1.Bootstrap、ServerBootstrap
- 1.6.2.2.Future、ChannelFuture
- 1.6.2.3.Channel
- 1.6.2.4.Selector
- 1.6.2.5.NioEventLoop
- 1.6.2.6.NioEventLoopGroup
- 1.6.2.7.ChannelHandler
- 1.6.2.8.ChannelHandlerContext
- 1.6.2.9.ChannelPipline
- 1.6.3.Netty工作原理架构
- 1.6.3.1.服务端Netty Reactor架构
- 1.6.3.2.Netty工作原理
- 1.6.3.3.典型的初始化并启动 Netty 服务端的过程代码:
- 2.Netty案例
- 2.1.Netty的一个例子
- 2.2.Netty的通信过步骤:
- 2.3.Netty服务器端监听多个端口,接受多个请求
- 3.Netty粘包和拆包问题
- 3.1.问题描述
- 3.2.代码演示
- 3.3.解决方案
- 3.3.1.方案一:消息定长
- 3.3.1.1.说明:
- 3.3.1.2.代码演示
- 3.3.2.方案二:分隔符(建议使用!)
- 3.3.2.1.说明:
- 3.3.2.2.代码演示
- 3.3.3.方案三:自定义协议
- 3.3.3.1.说明:
- 4.Netty编解码技术
- 4.1.简介
- 4.2.JBoss Marshalling
- 4.2.1.简介
- 4.2.2.使用
- 5.数据通信
- 5.1.使用``长链接通道不断开``的形式进行通信
- 5.2.一次性批量提交数据,采用``短连接``的方式
- 5.3.我们可以使用一种``特殊的长链接``
- 6.总结
1. Netty简介
Netty是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端.
它极大地简化并简化了TCP和UDP套接字服务器等网络编程;
1.1.JDK原生NIO程序的问题
JDK原生也有一套网络应用程序 API,但是存在一系列问题,主要如下:
①.NIO的类库和 API 繁杂,使用麻烦.你需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等;
②.需要具备其他的额外技能做铺垫.例如熟悉 Java 多线程编程.因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的 NIO 程序;
③.可靠性能力不齐,开发工作量和难度都非常大.例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等;
④.JDK原生NIO的Bug.例如臭名昭著的Epoll Bug,它会导致Selector 空轮询,最终导致CPU 100%.官方声称JDK1.6 版本的update 18 修复了该问题,但是直到JDK 1.7 版本该问题仍旧存在,只不过该Bug 发生概率降低了一些而已,它并没有被根本解决;
1.2.Netty的特点
Netty对 JDK 自带的 NIO 的 API 进行封装,解决上述问题,主要特点有:
①.
设计优雅:
适用于各种传输类型的统一 API,阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池;真正的无连接数据报套接字支持(自3.1 起);
②.使用方便:
详细记录的Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了;
③.高性能,吞吐量更高:
延迟更低,减少资源消耗,最小化不必要的内存复制;
④.安全:
完整的 SSL/TLS 和 StartTLS 支持;
⑤.社区活跃,不断更新:
社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入;
1.3.Netty 常见的使用场景
1>.互联网行业:
在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用.典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信;
2>.游戏行业:
无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用.Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈. 非常方便定制和开发私有协议栈,账号登录服务器,地图服务器之间可以方便的通过 Netty 进行高性能的通信;
3>.大数据领域:
经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现;
1.4.Netty 高性能设计
Netty作为异步事件驱动的网络,高性能之处主要来自于其I/O模型和线程处理模型,
前者决定如何收发数据,后者决定如何处理数据;
1.4.1.I/O模型
用什么样的通道将数据发送给对方,BIO、NIO 或者 AIO,I/O 模型在很大程度上决定了框架的性能;
1.4.1.1.BIO(Blocking IO): 传统的IO/阻塞式IO
1>.模型图:
2>.特点:
①.每个请求都需要独立的线程完成数据Read,业务处理,数据 Write 的完整操作问题;
②.当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大;
③.连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在 Read 操作上,造成线程资源浪费;
1.4.1.2.NIO(Non-Blocking IO):新IO/非阻塞式IO
Netty的非阻塞I/O的实现关键是基于I/O复用模型,这里用Selector对象表示;
1.4.1.2.1.I/O复用模型
1>.模型图:
2>.说明:
①.在I/O复用模型中,会用到 Select,这个函数也会使进程阻塞,但是和阻塞I/O所不同的是这两个函数可以同时阻塞多个 I/O 操作;
②.而且可以同时对多个读操作,多个写操作的I/O函数进行检测,直到有数据可读或可写时,才真正调用 I/O 操作函数;
1.4.1.2.2.NIO的Selector模型
1>.模型图:
2>.组件:
①.Selector:一般称为选择器,也可以翻译为多路复用器;
②.Socket状态:Connect(连接就绪),Accept(接受就绪),Read(读就绪),Write(写就绪);
3>.说明:
①.Netty的 IO 线程 NioEventLoop 由于聚合了多路复用器Selector,可以同时并发处理成百上千个客户端连接;
②.当线程从某客户端 Socket通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务;
③.线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道;
④.由于读写操作都是非阻塞的,这就可以充分提升IO线程的运行效率,避免由于频繁I/O阻塞导致的线程挂起;
⑤.一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升;
4>.特点:
①基于Buffer
A.传统的 I/O 是面向字节流或字符流的,以流式的方式顺序地从一个 Stream 中读取一个或多个字节,因此也就不能随意改变读取指针的位置;
B.在 NIO中,抛弃了传统的I/O流,而是引入了 Channel 和 Buffer 的概念.在 NIO 中,只能从 Channel 中读取数据到 Buffer 中或将数据从 Buffer 中写入到 Channel;
C.基于 Buffer 操作不像传统 IO 的顺序操作,NIO 中可以随意地读取任意位置的数据;
1.4.2.线程处理模型
数据报如何读取?读取之后的编解码在哪个线程进行?编解码后的消息如何派发?线程模型的不同,对性能的影响也非常大
1>.通常,我们设计一个事件处理模型的程序有两种思路:
①.
轮询方式:
线程不断轮询访问相关事件发生源有没有发生事件,有发生事件就调用事件处理逻辑;
②.事件驱动方式:
发生事件,主线程把事件放入事件队列,在另外线程不断循环消费事件列表中的事件,调用事件对应的处理逻辑处理事件.事件驱动方式也被称为消息通知方式,其实是设计模式中观察者模式的思路;
2>.以GUI 的逻辑处理为例,说明两种逻辑的不同:
①.
轮询方式:线程不断轮询是否发生按钮点击事件,如果发生,调用处理逻辑;
②.事件驱动方式:发生点击事件把事件放入事件队列,在另外线程消费的事件列表中的事件,根据事件类型调用相关事件处理逻辑;
1.4.2.1.事件驱动模型
1>.模型图:
2>.组件:
①.事件队列(event queue):接收事件的入口,存储待处理事件;
②.分发器(event mediator):将不同的事件分发到不同的业务逻辑单元;
③.事件通道(event channel):分发器与处理器之间的联系渠道;
④.事件处理器(event processor):实现业务逻辑,处理完成后会发出事件,触发下一步操作;
3>.相对传统轮询模式,事件驱动有如下优点:
①.可扩展性好:分布式的异步架构,事件处理器之间高度解耦,可以方便扩展事件处理逻辑;
②.高性能:基于队列暂存事件,能方便并行异步处理事件;
1.4.2.2.Reactor线程模型
1>.简介:
①.Reactor是反应堆的意思,Reactor模型是指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式;
②.服务端程序处理传入的多路请求,并将它们同步分派给请求对应的处理线程,Reactor模式也叫Dispatcher模式,即 I/O 多路复用统一监听事件,收到事件后分发(Dispatch )给某进程,是编写高性能网络服务器的必备技术之一;
2>Reactor模型图:
3>.组件:
①.
Reactor:
Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对I/O事件做出反应.它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人;
②.Handlers:
处理程序执行 I/O 事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员.Reactor 通过调度适当的处理程序来响应 I/O 事件,处理程序执行非阻塞操作;
4>.针对Reactor的数量和Hanndler线程数量的不同,Reactor模型有3个变种:
①.单 Reactor 单线程;
②.单 Reactor 多线程;
③.主从 Reactor 多线程;
可以这样理解,Reactor就是一个执行 while(true){ selector.select(); …}循环的线程,会源源不断的产生新的事件,称作反应堆很贴切;
1.5.Netty的线程模型
1.5.1.简介
1>.Netty主要基于主从Reactors多线程模型做了一定的修改,
其中主从Reactor 多线程模型有多个Reactor;
1.5.2.主从Reactor多线程模型
1>.模型图:
2>.说明:
①.MainReactor:负责客户端的连接请求,并将请求转交给 SubReactor;
②.SubReactor:负责相应通道的IO读写请求;
③.非IO请求(具体逻辑处理)的任务则会直接写入队列,等待 worker threads 进行处理;
注意:虽然Netty的线程模型基于主从Reactor多线程,借用了MainReactor和SubReactor的结构.但是实际上 SubReactor和Worker线程在同一个线程池中;
3>.源码:
EventLoopGroup bossGroup = newNioEventLoopGroup();
EventLoopGroup workerGroup = newNioEventLoopGroup();
ServerBootstrap server= newServerBootstrap();
server.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class);
/**
上面代码中的bossGroup和workerGroup是Bootstrap构造方法中传入的两个对象,这两个group均是线程池:
I).bossGroup线程池则只是在Bind某个端口后,获得其中一个线程作为MainReactor,专门处理端口的Accept事件,每个端口对应一个Boss线程;
II).workerGroup线程池会被各个SubReactor和Worker线程充分利用;
*/
1.5.3.异步处理
1.5.3.1.简介
1>.异步的概念和同步相对.当一个异步过程调用发出后,调用者不能立刻得到结果.实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者;
2>.Netty中的I/O操作是异步的,包括Bind、Write、Connect等操作会简单的返回一个ChannelFuture;
3>.调用者并不能立刻获得结果,而是通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果;
4>.当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作;
5>.常见有如下操作:
①.通过 isDone()方法来判断当前操作是否完成;
②.通过isSuccess()方法来判断已完成的当前操作是否成功;
③.通过getCause()方法来获取已完成的当前操作失败的原因;
④.通过isCancelled()方法来判断已完成的当前操作是否被取消;
⑤.通过addListener()方法来注册监听器,当操作已完成(isDone()方法返回完成),将会通知指定的监听器;如果 Future对象已完成,则理解通知指定的监听器;
1.5.3.2.异步处理案例
serverBootstrap.bind(port).addListener(future -> {
if(future.isSuccess()) {
System.out. println( newDate() + ": 端口["+ port + "]绑定成功!");
} else{
System.err. println( "端口["+ port + "]绑定失败!");
}
});
/**
I).以上的代码中绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑;
II).相比传统阻塞 I/O,执行 I/O 操作后线程会被阻塞住,直到操作完成;异步处理的好处是不会造成线程阻塞,线程在 I/O 操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量;
*/
1.6.Netty 架构设计
1.6.1.功能特性
1>.如图:
2>.Netty功能特性如下:
①.
传输服务:
支持 BIO 和 NIO;
②.容器集成:
支持 OSGI、JBossMC、Spring、Guice 容器;
③.协议支持:
HTTP、Protobuf、二进制、文本、WebSocket等一系列常见协议都支持.还支持通过实行编码解码逻辑来实现自定义协议;
④.Core 核心:
可扩展事件模型、通用通信 API、支持零拷贝的 ByteBuf 缓冲对象;
1.6.2.组件模块
1.6.2.1.Bootstrap、ServerBootstrap
Bootstrap意思是引导,一个Netty应用通常由一个Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,
Netty中Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类;
1.6.2.2.Future、ChannelFuture
在Netty中所有的IO操作都是异步的,不能立刻得知消息是否被正确处理.但是可以过一会等它执行完成或者直接注册一个监听.
具体的实现就是通过Future和ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件;
1.6.2.3.Channel
1>.Netty 网络通信的组件,能够用于执行网络I/O 操作.Channel为用户提供:
①.判断当前网络连接的通道的状态(例如是否打开?是否已连接?);
②.设置网络连接的配置参数(例如接收缓冲区大小);
③.提供异步的网络I/O操作(如建立连接,读写,绑定端口),异步调用意味着任何I/O调用都将立即返回,并且不保证在调用结束时所请求的I/O操作已完成.调用立即返回一个ChannelFuture实例,通过注册监听器到ChannelFuture上,可以I/O操作成功、失败或取消时回调通知调用方;
④.支持关联I/O 操作与对应的处理程序;
2>.不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应.下面是一些常用的 Channel类型:
①.NioSocketChannel:异步的客户端 TCP Socket 连接;
②.NioServerSocketChannel:异步的服务器端 TCP Socket 连接;
③.NioDatagramChannel:异步的 UDP 连接;
④.NioSctpChannel:异步的客户端 Sctp 连接;
⑤.NioSctpServerChannel:异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO;
1.6.2.4.Selector
1>.Netty 基于Selector 对象实现I/O 多路复用
,通过Selector 一个线程可以监听多个连接的 Channel 事件;
2>.当向一个Selector中注册Channel后,Selector内部的机制就可以自动不断地查询(Select)这些注册的Channel是否有已就绪的I/O事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个Channel.
1.6.2.5.NioEventLoop
NioEventLoop中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务:
①.I/O任务:即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys()方法触发;
②.非IO任务:添加到taskQueue中的任务,如 register0、bind0等任务,由runAllTasks方法触发;
注意:两种任务的执行时间比由变量ioRatio控制,默认为50,则表示允许非IO任务执行的时间与IO任务的执行时间相等
;
1.6.2.6.NioEventLoopGroup
NioEventLoopGroup主要管理eventLoop的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个Channel 上的事件,而一个Channel 只对应于一个线程;
1.6.2.7.ChannelHandler
1>.ChannelHandler是一个接口,处理I/O事件或拦截I/O操作,并将其转发到其ChannelPipeline(业务处理链)中的下一个处理程序;
2>.ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类:
①.ChannelInboundHandler:用于处理入站I/O事件;
②.ChannelOutboundHandler:用于处理出站I/O操作;
或者使用以下适配器类:
①.ChannelInboundHandlerAdapter:用于处理入站I/O 事件;
②.ChannelOutboundHandlerAdapter:用于处理出站I/O 操作;
③.ChannelDuplexHandler:用于处理入站和出站事件;
1.6.2.8.ChannelHandlerContext
保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象;
1.6.2.9.ChannelPipline
1>.保存ChannelHandler的List,用于处理或拦截Channel的入站事件和出站操作;
2>.ChannelPipeline实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式以及Channel中各个的ChannelHandler如何相互交互;
3>.下图描述了ChannelPipeline中ChannelHandler通常如何处理 I/O 事件:
①.例如:
ChannelHandlerContext.fireChannelRead(Object)和 ChannelOutboundInvoker.write(Object)转发到其最近的处理程序; |
---|
②.说明:
I).I/O事件由 ChannelInboundHandler 或 ChannelOutboundHandler 处理,并通过调用 ChannelHandlerContext 中定义的事件传播方法;
II).入站事件由自下而上方向的入站处理程序处理,如图左侧所示.入站 Handler 处理程序通常处理由图底部的 I/O 线程生成的入站数据;
通常通过实际输入操作(例如 SocketChannel.read(ByteBuffer))从远程读取入站数据;
III).出站事件由上下方向处理,如图右侧所示.出站 Handler 处理程序通常会生成或转换出站传输,例如 write 请求;
I/O线程通常执行实际的输出操作,例如 SocketChannel.write(ByteBuffer);
4>.在Netty中每个Channel 都有且仅有一个ChannelPipeline与之对应,它们的组成关系如下:
①.如图:
②.说明:
I).如上图所示,一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler;
II).入站事件和出站事件在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰;
1.6.3.Netty工作原理架构
1.6.3.1.服务端Netty Reactor架构
1>.如图:
2>.说明:
①.Server端包含1个Boss NioEventLoopGroup 和1个 Worker NioEventLoopGroup;
I).Boss NioEventLoopGroup:主线程组,监听所有连接到netty绑定端口的请求的准备就绪事件;
II).Worker NioEventLoopGroup:监听请求准备就绪之后要执行的事件;②.NioEventLoopGroup相当于1个事件循环组,这个组里包含多个事件循环;NioEventLoop,每个NioEventLoop 包含1个Selector 和1个事件循环线程;
③.每个Boss NioEventLoop循环执行的任务包含3步:I).轮询Accept事件;
II).处理Accept I/O 事件,与Client建立连接,生成NioSocketChannel,并将NioSocketChannel注册到某个Worker NioEventLoop的Selector上;
III).处理任务队列中的任务,runAllTasks.任务队列中的任务包括用户调用 eventloop.execute 或 schedule 执行的任务,或者其他线程提交到该 eventloop 的任务;④.每个Worker NioEventLoop循环执行的任务包含3步:
I).轮询 Read、Write 事件;
II).处理 I/O 事件,即 Read、Write 事件,在 NioSocketChannel 可读、可写事件发生时进行处理;
III).处理任务队列中的任务,runAllTasks;⑤.其中任务队列中的Task有3种典型使用场景:
I).用户程序自定义的普通任务:
ctx.channel().eventLoop().execute( newRunnable() { @Override publicvoidrun(){ //... } });
II).非当前Reactor线程调用 Channel 的各种方法:
例如在推送系统的业务线程里面,根据用户的标识,找到对应的 Channel 引用,然后调用 Write 类方法向该用户推送消息,就会进入到这种场景.最终的 Write 会提交到任务队列中后被异步消费
III).用户自定义定时任务:ctx.channel().eventLoop().schedule( newRunnable() { @Override publicvoidrun(){ } }, 60, TimeUnit.SECONDS);
1.6.3.2.Netty工作原理
1>.netty服务器启动,绑定对应的端口,同时也监听着这个端口的请求,此后所有(客户端)给这个端口发送的数据netty服务器都可以接收到;
2>.netty服务器启动之后会初始化一个nioserversocketchannel通道,然后注册到selector中;
3>.selector内部会监听一个accept事件,表明channel已经打开/准备就绪,可以往netty务器中传输数据了;
4>.netty(内部selector)再与(多个)客户端建立连接,生成niosocketchannel通道,他就是与netty与客户端连接的一个通道;
5>.将niosocketchannel通道注册到selector中,在这个selector中会监听read和write事件;监听客户端的读写请求是否准备就绪;
6>.当读准备就绪,表示读请求发送的数据(channel)已经接受完成,netty服务器可以从通道中读取数据进行处理;写请求准备就绪表示可以往通道中发送响应;
7>.当读写事件都准备就绪之后,就进行事件处理(读事件会生成一个任务,写事件也会生成一个任务),然后将任务分发给任务队列,最后由netty执行任务队列中的任务;
8>.同样的,监听accept事件的selector在accept事件准备就绪之后也会生成对应的任务队列,最后由netty去完成;
9>.当请求的读写事件都准备就绪之后,accept事件就会被触发,然后再将请求连接到服务器,最后服务器分配一个线程进行处理;
1.6.3.3.典型的初始化并启动 Netty 服务端的过程代码:
publicstaticvoidmain(String[] args) {
// 创建mainReactor
NioEventLoopGroup boosGroup = newNioEventLoopGroup();
// 创建工作线程组
NioEventLoopGroup workerGroup = newNioEventLoopGroup();
final ServerBootstrap serverBootstrap = newServerBootstrap();
serverBootstrap
.group(boosGroup, workerGroup) // 组装NioEventLoopGroup
.channel(NioServerSocketChannel.class) // 设置channel类型为NIO类型
.option(ChannelOption.SO_BACKLOG, 1024) // 设置连接配置参数
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true) // 配置入站,出站事件handler
.childHandler( newChannelInitializer<NioSocketChannel>() {
@ Override
protectedvoidinitChannel(NioSocketChannel ch) {
// 配置入站、出站事件channel
ch.pipeline().addLast(...);
ch.pipeline().addLast(...);
}
});
// 绑定端口
int port = 8080;
serverBootstrap.bind(port).addListener(future -> {
if(future.isSuccess()) {
System. out.println( newDate() + ": 端口["+ port + "]绑定成功!");
} else{
System.err.println( "端口["+ port + "]绑定失败!");
}
});
}
基本过程如下:
①.初始化创建2个NioEventLoopGroup,其中boosGroup用于Accetpt连接建立事件并分发请求,workerGroup用于处理I/O读写事件和业务逻辑;
②.基于 ServerBootstrap(服务端启动引导类),配置 EventLoopGroup、Channel 类型,连接参数、配置入站、出站事件handler;
③.绑定端口,开始工作;
2.Netty案例
2.1.Netty的一个例子
1>.服务器端代码:
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); //用来接收进来的连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); //用来处理已经被接收的连接(I/O读写)
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024) //tcp缓冲区
.option(ChannelOption.SO_SNDBUF,32*1024)
.option(ChannelOption.SO_RCVBUF,32*1024)
.option(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ServerHandler()); //接受数据之后的处理类
}
});
// 同步监听客户端的链接
ChannelFuture f = b.bind(8765).sync();
f.channel().closeFuture().sync();
// 资源释放
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import java.io.UnsupportedEncodingException;
/**
* 服务端业务(数据)处理类
*/
public class ServerHandler extends ChannelHandlerAdapter {
// 读取客户端传输的channel中的数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
// 将channel中的数据转换成bytebuf
ByteBuf byteBuf= (ByteBuf) msg;
byte[] req= new byte[byteBuf.readableBytes()];
//将channel中的数据读取到缓冲区中byte中
byteBuf.readBytes(req);
//将byte[]中的内容转换成String类型的内容
String body=new String(req,"utf-8");
System.out.println("server:"+body);
// 响应给客户端
String response="Hi, Client";
//在创建一个线程,异步将数据写入channel中传递给客户端
//当一个写请求已经完成会自动关闭连接(/关闭客户端),变成了短连接
ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()))
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
2>.客户端代码:
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class Client {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture f = b.connect("127.0.0.1", 8765).sync();
// 数据传输,将输入写入到通道中,传入服务器端
f.channel().write(Unpooled.copiedBuffer("hello netty".getBytes()));
f.channel().flush(); //建议使用writeandflush,每写一条数就flush到channel中
f.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.util.ReferenceCountUtil;
import java.io.UnsupportedEncodingException;
/**
* 处理服务器端的响应数据
*/
public class ClientHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
// 将channel中的数据转换成bytebuf
ByteBuf byteBuf= (ByteBuf) msg;
byte[] req= new byte[byteBuf.readableBytes()];
//将channel中的数据读取到缓冲区中byte中
byteBuf.readBytes(req);
//将byte[]中的内容转换成String类型的内容
String body=new String(req,"utf-8");
System.out.println("client:"+body);
// 释放缓冲区
ReferenceCountUtil.release(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
2.2.Netty的通信过步骤:
1>.服务端创建两个nio线程组,一个专门用于网络事件处理(即接收客户端的链接),另一个则进行网络通信读写;
2>.创建一个serverbootstrap对象,配置netty的一系列参数,例如接收传出的数据的缓存大小;
3>.创建一个实例处理数据的类channelinitializer,进行初始化准备工作,例如设置接收传出数据的字符集,格式等;
4>.绑定端口,执行同步阻塞方法等待服务器端启动;
2.3.Netty服务器端监听多个端口,接受多个请求
1>.服务器端绑定多个端口,监听多个客户端的请求,代码如下:
// 同步监听客户端的链接
ChannelFuture f = b.bind(8765).sync();
ChannelFuture f2 = b.bind(8764).sync();
f.channel().closeFuture().sync();
f2.channel().closeFuture().sync();
2>.客户端发送多个请求,代码如下:
ChannelFuture f = b.connect("127.0.0.1", 8765).sync();
ChannelFuture f2=b.connect("127.0.0.1",8764).sync();
// 数据传输,将输入写入到通道中,传入服务器端
f.channel().writeAndFlush(Unpooled.copiedBuffer("hello 8765".getBytes()));
f2.channel().writeAndFlush(Unpooled.copiedBuffer("hello 8764".getBytes()));
3.Netty粘包和拆包问题
3.1.问题描述
1>.在TCP/IP的基于流的传输中,接收的数据被存储到套接字接收缓冲器中.不幸的是,基于流的传输的缓冲器不是分组的队列,而是字节的队列.
这意味着,即使将两个消息作为两个独立的数据包发送,操作系统也不会将它们视为两个消息,而只是一组字节(有点悲剧). 因此,不能保证读的是您在远程定入的行数据. 例如,假设操作系统的TCP/IP堆栈已收到三个数据包:
如图:
2>.由于基于流的协议的这种通用属性,在应用程序中以下面的碎片形式(只是其中的一种)读取它们的机会很高:
如图:
3>.因此,接收部分,无论是服务器侧还是客户端侧,都应该将接收到的数据碎片整理成逻辑可由应用容易地理解的一个或多个有意义的帧. 在上述示例的情况下,接收的数据应该如下成帧:
如图:
3.2.代码演示
1>.客户端代码:
public class Client {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture f = b.connect("127.0.0.1", 8765).sync();
// 数据传输,将输入写入到通道中,传入服务器端
f.channel().writeAndFlush(Unpooled.copiedBuffer("hello 8765".getBytes()));
f.channel().writeAndFlush(Unpooled.copiedBuffer("hello 8765".getBytes()));
f.channel().writeAndFlush(Unpooled.copiedBuffer("hello 8765".getBytes()));
f.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
2>.服务端代码:
public class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); //用来接收进来的连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); //用来处理已经被接收的连接(I/O读写)
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024) //tcp缓冲区
.option(ChannelOption.SO_SNDBUF,32*1024)
.option(ChannelOption.SO_RCVBUF,32*1024)
.option(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ServerHandler()); //接受数据之后的处理类
}
});
// 同步监听客户端的链接
ChannelFuture f = b.bind(8765).sync();
ChannelFuture f2 = b.bind(8764).sync();
f.channel().closeFuture().sync();
f2.channel().closeFuture().sync();
// 资源释放
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
3>.服务端接受的内容:
server:hello 8765hello 8765hello 8765
//数据没有分开,服务器还没来得及处理下一次请求就过来了,所以服务器把他们当成一次请求进行处理了!
4>.服务器端响应给客户端的内容:
client:Hi, Client
//只响应了一次,服务器将客户端的三次请求当成了一次处理!
3.3.解决方案
3.3.1.方案一:消息定长
3.3.1.1.说明:
每个报文的大小固定为200个字节,如果不够,空位补空格
3.3.1.2.代码演示
1>.服务端代码:
public class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); //用来接收进来的连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); //用来处理已经被接收的连接(I/O读写)
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024) //tcp缓冲区
.option(ChannelOption.SO_SNDBUF,32*1024)
.option(ChannelOption.SO_RCVBUF,32*1024)
.option(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//设置定长字符串接受/每次接受到的消息内容的长度达到指定的长度之后就认为这是一次请求
//无论客户端发送几次请求,服务器端每次只取定长内容作为一次请求处理
ch.pipeline().addLast(new FixedLengthFrameDecoder(5));
//设置字符串形式的解码,将消息转换成字符串形式
ch.pipeline().addLast(new StringDecoder());
//这句代码要放在最后,等到消息过滤/转换之后才去处理
ch.pipeline().addLast(new ServerHandler()); //接受数据之后的处理类
}
});
// 同步监听客户端的链接
ChannelFuture f = b.bind(8765).sync();
f.channel().closeFuture().sync();
// 资源释放
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public class ServerHandler extends ChannelHandlerAdapter {
// 读取客户端传输的channel中的数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
String request = (String) msg;
System.out.println("server:" + request);
//响应给客户端一个字符串形式的数据
String response = request;
//在创建一个线程,异步将数据写入channel中传递给客户端
//当一个写请求已经完成会自动关闭连接(/关闭客户端)
ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
2>.客户端代码:
public class Client {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//设置定长字符串接受/每次接受到的消息内容的长度达到指定的长度之后就认为这是一次请求
//如果最后一次消息长度超过了指定长度,那么超出的部分会被丢弃掉
//要想不被丢弃可以使用空格补位,补到指定长度的倍数
ch.pipeline().addLast(new FixedLengthFrameDecoder(5));
//设置字符串形式的解码,将消息转换成字符串形式
ch.pipeline().addLast(new StringDecoder());
//这句代码要放在最后,等到消息过滤/转换之后才去处理
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture f = b.connect("127.0.0.1", 8765).sync();
// 数据传输,将输入写入到通道中,传入服务器端
f.channel().writeAndFlush(Unpooled.copiedBuffer("aaaaabbbbb".getBytes()));
f.channel().writeAndFlush(Unpooled.copiedBuffer("cccc".getBytes()));
f.channel().writeAndFlush(Unpooled.copiedBuffer("dddddd".getBytes()));
f.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
public class ClientHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
String response= (String) msg;
System.out.println("响应数据:"+response);
ReferenceCountUtil.release(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
3.3.2.方案二:分隔符(建议使用!)
3.3.2.1.说明:
在每个数据包的尾部增加特殊的字符进行分割,例如回车/空格/””等
3.3.2.2.代码演示
1>.服务端代码:
public class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); //用来接收进来的连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); //用来处理已经被接收的连接(I/O读写)
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024) //tcp缓冲区
.option(ChannelOption.SO_SNDBUF,32*1024)
.option(ChannelOption.SO_RCVBUF,32*1024)
.option(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//设置特殊分隔符,只要数据中包含了指定的特殊字符,那么就认为这是一次请求
ByteBuf byteBuf= Unpooled.copiedBuffer("$_".getBytes());
//在管道中对消息进行分割/过滤
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));
//设置字符串形式的解码,将消息转换成字符串形式
ch.pipeline().addLast(new StringDecoder());
//这句代码要放在最后,等到消息过滤/转换之后才去处理
ch.pipeline().addLast(new ServerHandler()); //接受数据之后的处理类
}
});
// 同步监听客户端的链接
ChannelFuture f = b.bind(8765).sync();
f.channel().closeFuture().sync();
// 资源释放
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public class ServerHandler extends ChannelHandlerAdapter {
// 读取客户端传输的channel中的数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
String request = (String) msg;
System.out.println("server:" + request);
//响应给客户端的数据
String response = "我是响应数据$_";
//在创建一个线程,异步将数据写入channel中传递给客户端
//当一个写请求已经完成会自动关闭连接(/关闭客户端)
ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
2>.客户端代码:
public class Client {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//设置特殊分隔符,只要数据中包含了指定的特殊字符,那么就认为这是一次请求
ByteBuf byteBuf= Unpooled.copiedBuffer("$_".getBytes());
//在管道中对消息进行分割/过滤
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));
//设置字符串形式的解码,将消息转换成字符串形式
ch.pipeline().addLast(new StringDecoder());
//这句代码要放在最后,等到消息过滤/转换之后才去处理
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture f = b.connect("127.0.0.1", 8765).sync();
// 数据传输,将输入写入到通道中,传入服务器端
f.channel().writeAndFlush(Unpooled.copiedBuffer("hello 8765$_".getBytes()));
f.channel().writeAndFlush(Unpooled.copiedBuffer("hello 8765$_".getBytes()));
f.channel().writeAndFlush(Unpooled.copiedBuffer("hello 8765$_".getBytes()));
f.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
public class ClientHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
String response= (String) msg;
System.out.println("响应数据:"+response);
ReferenceCountUtil.release(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
3.3.3.方案三:自定义协议
3.3.3.1.说明:
将消息分为消息头和消息体,在消息头中包含表示消息总长度的字段,然后进行业务逻辑的处理
4.Netty编解码技术
4.1.简介
1>.编解码技术,说白了就是java序列化技术,
序列化的目的有两个:网络传输和对象持久化;
2>.虽然我们可以使用java进行对象的序列化,然后使用netty去传输,但是java序列化的硬伤太多,比如java序列化无法跨语言,序列化后码流太大,序列化性能太差等等;
3>.主流的序列化框架:
①.JBoss的marshalling包;
②.Google的protobuf/protostuf;
③.基于protobuf的kyro;
④.messagepack框架;
…
4.2.JBoss Marshalling
4.2.1.简介
JBoss Marshalling是一个java对象序列化包,他对jdk默认的序列化框架进行了优化,但是又跟java.io.seriallizable接口保持兼容,同时又增加了一些可调的参数和附加特性
;
4.2.2.使用
1>.jar包依赖:
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling</artifactId>
<version>2.0.0.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.marshalling</groupId>
<artifactId>jboss-marshalling-serial</artifactId>
<version>2.0.0.Final</version>
</dependency>
2>.创建Marshalling序列化工厂:
/**
* Marshalling序列化工厂
*/
public class MarshallingCodeCFactory {
/**
* 解码
* @return
*/
public static MarshallingDecoder buildMarshallingDecoder() {
//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象.
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
//创建了MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
//根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024*1024*1);
return decoder;
}
/**
* 编码
* @return
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
3>.客户端:
public class Client {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//使用Marshalling编解码技术
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
//这句代码要放在最后,等到消息过滤/转换之后才去处理
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture f = b.connect("127.0.0.1", 8765).sync();
//定义要传输的对象
for (int i = 0; i < 2; i++) {
User user = new User();
user.setId(i).setUserName("request"+i).setEmail("req@163.com"+i).setAge(22+i);
f.channel().writeAndFlush(user);
}
f.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
public class ClientHandler extends ChannelHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
User user= (User) msg;
System.out.println("响应数据:" + user);
ReferenceCountUtil.release(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
4>.服务端:
public class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); //用来接收进来的连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); //用来处理已经被接收的连接(I/O读写)
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.option(ChannelOption.SO_SNDBUF,32*1024)
.option(ChannelOption.SO_RCVBUF,32*1024)
.option(ChannelOption.SO_KEEPALIVE,true)
.handler(new LoggingHandler(LogLevel.INFO)) //日志处理器,打印日志
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//使用Marshalling编解码技术
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
//这句代码要放在最后,等到消息过滤/转换之后才去处理
ch.pipeline().addLast(new ServerHandler()); //接受数据之后的处理类
}
});
// 同步监听客户端的链接
ChannelFuture f = b.bind(8765).sync();
f.channel().closeFuture().sync();
// 资源释放
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public class ServerHandler extends ChannelHandlerAdapter {
// 读取客户端传输的channel中的数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
// 直接转换成user对象
User user = (User) msg;
System.out.println("server:"+user);
//响应给客户端的数据
User user1=new User();
user1.setAge(23).setEmail("resp@163.com").setUserName("resp").setId(1);
ctx.writeAndFlush(user1);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
5>.java对象:
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
public class User implements Serializable {
private Integer id;
private String userName;
private String email;
private int age;
}
5.数据通信
5.1.使用长链接通道不断开
的形式进行通信
也就是服务器和客户端的通道一直处于开启状态,
如果服务器性能足够好,并且客户端数据量比较少的情况下,我们还是推荐使用这种方式;
5.2.一次性批量提交数据,采用短连接
的方式
将数据保存在本地的临时缓冲区或者临时表中,当达到临界值时进行一次性批量提交,又或者根据定时任务轮询提交,
这种情况的弊端是做不到实时性传输,在对实时性不高的应用中可以推荐使用;
5.3.我们可以使用一种特殊的长链接
指定某一时间之内,如果服务器与某台客户端,没有任何通信,那么就断开连接;下次连接则是客户端向服务器发送请求的时候,再次建立链接,
这种模式需要考虑两个因素:
1>.如何在超时(即服务器和客户端没有任何通信)后关闭通道?
客户端主动向服务器发起链接
①.在客户端指定,如果在指定的时间段内客户端和服务器没有任何数据通信,那么客户端就和服务器断开连接:
ch.pipeline().addLast(new ReadTimeoutHandler(5)); //这是netty5新增的特性
②.在服务器端指定,如果在指定的时间段内客户端和服务器没有任何数据通信,那么客户端就和服务器断开连接:
ch.pipeline().addLast(new ReadTimeoutHandler(5)); //这是netty5新增的特性
2>.关闭通道后如何再次建立链接?
每次发送消息之前判断当前的channel是否被初始化(/对象的实例是否为空),如果没有被初始化,那么就进行connect,得到一个channel;如果已经被初始化,那么在判断channel的链接是否正常(isactive),如果不正常,重新connect,得到channel,进行通信;
注意:客户端宕机了,无需考虑,下次客户端重启之后就可以与服务器重新建立链接;但是服务器宕机时,我们的客户端如何与服务器进行连接?
使用定时任务,写一个脚本,每个一段时间(先启动客户端)定时给服务器发送一次请求,看看服务器是否宕机,如果服务器宕机了,那么本次请求失败处理;然后再隔一段时间(先启动客户端)再次给服务器发送一次请求;
6.总结
现在稳定推荐使用的主流版本还是 Netty4,Netty5 中使用了 ForkJoinPool,增加了代码的复杂度,但是对性能的改善却不明显,所以这个版本不推荐使用,官网也没有提供下载链接;
最后
以上就是甜蜜唇彩为你收集整理的Netty相关知识的全部内容,希望文章能够帮你解决Netty相关知识所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复