概述
Netty进阶
一、Netty 核心模块组件
(1) Bootstrap、ServerBootstrap
-
Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类
-
常见的方法有
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup),该方法用于服务器端, 用来设置两个 EventLoop
public B group(EventLoopGroup group) ,该方法用于客户端,用来设置一个 EventLoop
public B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道实现
public B option(ChannelOption option, T value),用来给 ServerChannel 添加配置
public ServerBootstrap childOption(ChannelOption childOption, T value),用来给接收到的通道添加配置
public ServerBootstrap childHandler(ChannelHandler childHandler), 该方法用来设置业务处理类( 自定义的
handler)
public ChannelFuture bind(int inetPort) ,该方法用于服务器端,用来设置占用的端口号
public ChannelFuture connect(String inetHost, int inetPort) ,该方法用于客户端,用来连接服务器端
(2) Future、ChannelFuture
Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件
常见的方法有
Channel channel(),返回当前正在进行 IO 操作的通道
ChannelFuture sync(),等待异步操作执行完毕
(3) Channel
-
Netty 网络通信的组件,能够用于执行网络 I/O 操作。
-
通过 Channel 可获得当前网络连接的通道的状态
-
通过 Channel 可获得 网络连接的配置参数 (例如接收缓冲区大小)
-
Channel 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成
-
调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方
-
支持关联 I/O 操作与对应的处理程序
-
不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应。
常用的 Channel 类型:
-
NioSocketChannel,异步的客户端 TCP Socket 连接。
-
NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
-
NioDatagramChannel,异步的 UDP 连接。
-
NioSctpChannel,异步的客户端 Sctp 连接。
-
NioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。
(4) Selector
-
Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。
-
当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select) 这些注册的Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel
(5) ChannelHandler 及其实现类
- ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)
中的下一个处理程序。
-
ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类
-
ChannelHandler 及其实现类一览图(后)
- 我们经常需要自定义一个 Handler 类去继承 ChannelInboundHandlerAdapter,然后通过重写相应方法实现业务逻辑,我们接下来看看一般都需要重写哪些方法
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.fireExceptionCaught(cause);
}
}
(6) Pipeline 和 ChannelPipeline
ChannelPipeline 是一个重点:
-
ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于一个贯穿 Netty 的链。(也可以这样理解:ChannelPipeline 是 保存 ChannelHandler 的 List,用于处理或拦截Channel 的入站事件和出站操作)
-
ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel中各个的 ChannelHandler 如何相互交互
-
在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下
- 常用方法
ChannelPipeline addFirst(ChannelHandler… handlers),把一个业务处理类(handler)添加到链中的第一个位置ChannelPipeline addLast(ChannelHandler… handlers),把一个业务处理类(handler)添加到链中的最后一个位置
(7) ChannelHandlerContext
-
保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象
-
即 ChannelHandlerContext中包含一个具体的事件处理器ChannelHandler,同时ChannelHandlerContext 中也绑定了对应的 pipeline 和 Channel 的信息,方便对 ChannelHandler 进行调用.
-
常用方法
(8) ChannelOption
-
Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。
-
ChannelOption 参数如下:
(9) EventLoopGroup 和其实现类 NioEventLoopGroup
-
EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般会有多个 EventLoop同时工作,每个 EventLoop 维护着一个 Selector 实例。
-
EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop 来处理任务。在 Netty 服 务 器 端 编 程 中 , 我 们 一 般 都 需 要 提 供 两 个 EventLoopGroup , 例 如 : BossEventLoopGroup 和WorkerEventLoopGroup。
-
通常一个服务端口即一个 ServerSocketChannel 对应一个 Selector 和一个EventLoop 线程。BossEventLoop 负责接收客户端的连接并将 SocketChannel 交给 WorkerEventLoopGroup 来进行 IO 处理,如下图所示
- 常用方法
public NioEventLoopGroup(),构造方法
public Future<?> shutdownGracefully(),断开连接,关闭线程
(10) Unpooled
-
Netty 提供一个专门用来操作缓冲区(即 Netty 的数据容器)的工具类
-
常用方法如下所示
-
举例说明 Unpooled 获取 Netty 的数据容器 ByteBuf 的基本使用 【案例演示】
案例 1
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
public class NettyByteBuf01 {
public static void main(String[] args) {
//创建一个ByteBuf
//说明
//1. 创建 对象,该对象包含一个数组arr , 是一个byte[10]
//2. 在netty 的buffer中,不需要使用flip 进行反转
// 底层维护了 readerindex 和 writerIndex
//3. 通过 readerindex 和 writerIndex 和 capacity, 将buffer分成三个区域
// 0---readerindex 已经读取的区域
// readerindex---writerIndex , 可读的区域
// writerIndex -- capacity, 可写的区域
ByteBuf buffer = Unpooled.buffer(10);
for(int i = 0; i < 10; i++) {
buffer.writeByte(i);
}
System.out.println("capacity=" + buffer.capacity());//10
//输出
// for(int i = 0; i<buffer.capacity(); i++) {
// System.out.println(buffer.getByte(i));
// }
for(int i = 0; i < buffer.capacity(); i++) {
System.out.println(buffer.readByte());
}
System.out.println("执行完毕");
}
}
案例2
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.charset.Charset;
public class NettyByteBuf02 {
public static void main(String[] args) {
//创建ByteBuf
ByteBuf byteBuf = Unpooled.copiedBuffer("hello,world!", Charset.forName("utf-8"));
//使用相关的方法
if(byteBuf.hasArray()) { // true
byte[] content = byteBuf.array();
//将 content 转成字符串
System.out.println(new String(content, Charset.forName("utf-8")));
System.out.println("byteBuf=" + byteBuf);
System.out.println(byteBuf.arrayOffset()); // 0
System.out.println(byteBuf.readerIndex()); // 0
System.out.println(byteBuf.writerIndex()); // 12
System.out.println(byteBuf.capacity()); // 36
//System.out.println(byteBuf.readByte()); //
System.out.println(byteBuf.getByte(0)); // 104
int len = byteBuf.readableBytes(); //可读的字节数 12
System.out.println("len=" + len);
//使用for取出各个字节
for(int i = 0; i < len; i++) {
System.out.println((char) byteBuf.getByte(i));
}
//按照某个范围读取
System.out.println(byteBuf.getCharSequence(0, 4, Charset.forName("utf-8")));
System.out.println(byteBuf.getCharSequence(4, 6, Charset.forName("utf-8")));
}
}
}
二、Netty 应用实例-群聊系统
实例要求:
-
编写一个 Netty 群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
-
实现多人群聊
-
服务器端:可以监测用户上线,离线,并实现消息转发功能
-
客户端:通过 channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发得到)
-
目的:进一步理解 Netty 非阻塞网络编程机制
-
代码演示
(1) GroupChatServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class GroupChatServer {
private int port; // 监听端口
public GroupChatServer(int port) {
this.port = port;
}
// 编写run方法,处理客户端的请求
public void run() throws Exception {
// 创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 8个NioEventLoop
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 获取到pipeline
ChannelPipeline pipeline = ch.pipeline();
// 向pipeline加入解码器
pipeline.addLast("decoder", new StringDecoder());
// 向pipeline加入编码器
pipeline.addLast("encoder", new StringEncoder());
// 加入自己的业务处理handler
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("netty 服务器启动");
ChannelFuture channelFuture = b.bind(port).sync();
// 监听关闭
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatServer(7000).run();
}
}
(2) GroupChatServerHandler.java
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
// public static List<Channel> channels = new ArrayList<Channel>();
// 使用一个hashmap 管理
// public static Map<String, Channel> channels = new HashMap<String,Channel>();
// 定义一个channle 组,管理所有的channel
// GlobalEventExecutor.INSTANCE) 是全局的事件执行器,是一个单例
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// handlerAdded 表示连接建立,一旦连接,第一个被执行
// 将当前channel 加入到 channelGroup
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// 将该客户加入聊天的信息推送给其它在线的客户端
/*
* 该方法会将 channelGroup 中所有的channel 遍历,并发送 消息, 我们不需要自己遍历
*/
channelGroup
.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " n");
channelGroup.add(channel);
}
// 断开连接, 将xx客户离开信息推送给当前在线的客户
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了n");
System.out.println("channelGroup size" + channelGroup.size());
}
// 表示channel 处于活动状态, 提示 xx上线
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 上线了~");
}
// 表示channel 处于不活动状态, 提示 xx离线了
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 离线了~");
}
// 读取数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 获取到当前channel
Channel channel = ctx.channel();
// 这时我们遍历channelGroup, 根据不同的情况,回送不同的消息
channelGroup.forEach(ch -> {
if (channel != ch) {
// 不是当前的channel,转发消息
ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息" + msg + "n");
} else {
// 回显自己发送的消息给自己
ch.writeAndFlush("[自己]发送了消息" + msg + "n");
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 关闭通道
ctx.close();
}
}
(3) GroupChatClient
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
public class GroupChatClient {
// 属性
private final String host;
private final int port;
public GroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 得到pipeline
ChannelPipeline pipeline = ch.pipeline();
// 加入相关handler
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
// 加入自定义的handler
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
// 得到channel
Channel channel = channelFuture.channel();
System.out.println("-------" + channel.localAddress() + "--------");
// 客户端需要输入信息,创建一个扫描器
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
// 通过channel 发送到服务器端
channel.writeAndFlush(msg + "rn");
}
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatClient("127.0.0.1", 7000).run();
}
}
(4) GroupChatClientHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}
(5) User
public class User {
private int id;
private String pwd;
}
三、Netty 心跳检测机制案例
实例要求:
-
编写一个 Netty 心跳检测机制案例, 当服务器超过 3 秒没有读时,就提示读空闲
-
当服务器超过 5 秒没有写操作时,就提示写空闲
-
实现当服务器超过 7 秒没有读或者写操作时,就提示读写空闲
代码如下:
(1)MyServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class MyServer {
public static void main(String[] args) throws Exception {
// 创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 8个NioEventLoop
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 加入一个netty 提供 IdleStateHandler
/*
* 说明
* 1. IdleStateHandler 是netty 提供的处理空闲状态的处理器
* 2. long readerIdleTime : 表示多长时间没有读, 就会发送一个心跳检测包检测是否连接
* 3. long writerIdleTime : 表示多长时间没有写, 就会发送一个心跳检测包检测是否连接
* 4. long allIdleTime : 表示多长时间没有读写, 就会发送一个心跳检测包检测是否连接
* 5. 文档说明 triggers an {@link IdleStateEvent} when a {@link Channel} has not
* performed read, write, or both operation for a while.
* 6. 当 IdleStateEvent 触发后 , 就会传递给管道
* 的下一个handler去处理 通过调用(触发)下一个handler 的 userEventTiggered , 在该方法中去处理 IdleStateEvent(读空闲,写空闲,读写空闲)
*/
pipeline.addLast(new IdleStateHandler(7000, 7000, 10, TimeUnit.SECONDS));
// 加入一个对空闲检测进一步处理的handler(自定义)
pipeline.addLast(new MyServerHandler());
}
});
// 启动服务器
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
(2)MyServerHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
public class MyServerHandler extends ChannelInboundHandlerAdapter {
/**
*
* @param ctx 上下文
* @param evt 事件
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
// 将 evt 向下转型 IdleStateEvent
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()) {
case READER_IDLE:
eventType = "读空闲";
break;
case WRITER_IDLE:
eventType = "写空闲";
break;
case ALL_IDLE:
eventType = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() + "--超时时间--" + eventType);
System.out.println("服务器做相应处理..");
// 如果发生空闲,我们关闭通道
// ctx.channel().close();
}
}
}
四、Netty 通过 WebSocket 编程实现服务器和客户端长连接
实例要求:
-
Http 协议是无状态的, 浏览器和服务器间的请求响应一次,下一次会重新创建连接.
-
要求:实现基于 webSocket 的长连接的全双工的交互
-
改变 Http 协议多次请求的约束,实现长连接了, 服务器可以发送消息给浏览器
-
客户端浏览器和服务器端会相互感知,比如服务器关闭了,浏览器会感知,同样浏览器关闭了,服务器会感知
-
运行界面
- 代码演示
(1) MyServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class MyServer {
public static void main(String[] args) throws Exception{
//创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//因为基于http协议,使用http的编码和解码器
pipeline.addLast(new HttpServerCodec());
//是以块方式写,添加ChunkedWriteHandler处理器
pipeline.addLast(new ChunkedWriteHandler());
/*
说明
1. http数据在传输过程中是分段, HttpObjectAggregator ,就是可以将多个段聚合
2. 这就就是为什么,当浏览器发送大量数据时,就会发出多次http请求
*/
pipeline.addLast(new HttpObjectAggregator(8192));
/*
说明
1. 对应websocket ,它的数据是以 帧(frame) 形式传递
2. 可以看到WebSocketFrame 下面有六个子类
3. 浏览器请求时 ws://localhost:7000/hello 表示请求的uri
4. WebSocketServerProtocolHandler 核心功能是将 http协议升级为 ws协议 , 保持长连接
5. 是通过一个 状态码 101
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/hello2"));
//自定义的handler ,处理业务逻辑
pipeline.addLast(new MyTextWebSocketFrameHandler());
}
});
//启动服务器
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
(2) MyTextWebSocketFrameHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.time.LocalDateTime;
/**
* 这里 TextWebSocketFrame 类型,表示一个文本帧(frame)
*/
public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("服务器收到消息 " + msg.text());
// 回复消息
ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间" + LocalDateTime.now() + " " + msg.text()));
}
// 当web客户端连接后, 触发方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// id 表示唯一的值,LongText 是唯一的 ShortText 不是唯一
System.out.println("handlerAdded 被调用" + ctx.channel().id().asLongText());
System.out.println("handlerAdded 被调用" + ctx.channel().id().asShortText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved 被调用" + ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常发生 " + cause.getMessage());
ctx.close(); // 关闭连接
}
}
(3) hello.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<script>
var socket;
//判断当前浏览器是否支持websocket
if(window.WebSocket) {
//go on
socket = new WebSocket("ws://localhost:7000/hello2");
//相当于channelReado, ev 收到服务器端回送的消息
socket.onmessage = function (ev) {
var rt = document.getElementById("responseText");
rt.value = rt.value + "n" + ev.data;
}
//相当于连接开启(感知到连接开启)
socket.onopen = function (ev) {
var rt = document.getElementById("responseText");
rt.value = "连接开启了.."
}
//相当于连接关闭(感知到连接关闭)
socket.onclose = function (ev) {
var rt = document.getElementById("responseText");
rt.value = rt.value + "n" + "连接关闭了.."
}
} else {
alert("当前浏览器不支持websocket")
}
//发送消息到服务器
function send(message) {
if(!window.socket) { //先判断socket是否创建好
return;
}
if(socket.readyState == WebSocket.OPEN) {
//通过socket 发送消息
socket.send(message)
} else {
alert("连接没有开启");
}
}
</script>
<form onsubmit="return false">
<textarea name="message" style="height: 300px; width: 300px"></textarea>
<input type="button" value="发生消息" onclick="send(this.form.message.value)">
<textarea id="responseText" style="height: 300px; width: 300px"></textarea>
<input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
</form>
</body>
</html>
五、Google Protobuf
(1)编码和解码的基本介绍
编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码 [示意图]
codec(编解码器的组成部分有两个:解码器和 编码器。 decoder负责把字节码数据转换成业务数据
(2)Netty 本身的编码解码的机制和问题分析
-
Netty 自身提供了一些 codec(编解码器)
-
Netty 提供的编码器
StringEncoder,对字符串数据进行编码ObjectEncoder,对 Java 对象进行编码
- Netty 提供的解码器
StringDecoder, 对字符串数据进行解码
ObjectDecoder,对 Java 对象进行解码
-
Netty 本身自带的 ObjectDecoder 和 ObjectEncoder 可以用来实现 POJO 对象或各种业务对象的编码和解码,底层使用的仍是 Java 序列化技术 , 而 Java 序列化技术本身效率就不高,存在如下问题无法跨语言序列化后的体积太大,是二进制编码的 5 倍多。序列化性能太低
-
引出 新的解决方案 [Google 的 Protobuf]
(3)Protobuf
-
Protobuf 基本介绍和使用示意图
-
Protobuf 是 Google 发布的开源项目,全称 Google Protocol Buffers,是一种轻便高效的结构化数据存储格式, 可以用于结构化数据串行化,或者说序列化。它很适合做数据存储或 RPC[远程过程调用 remote procedure call ] 数据交换格式 。
目前很多公司 http+json tcp+protobuf
-
参考文档 : https://developers.google.com/protocol-buffers/docs/proto 语言指南
-
Protobuf 是以 message 的方式来管理数据的.
-
支持跨平台、跨语言,即[客户端和服务器端可以是不同的语言编写的] (支持目前绝大多数语言,例如 C++、C#、Java、python 等)
-
高性能,高可靠性
-
使用 protobuf 编译器能自动生成代码,Protobuf 是将类的定义使用.proto 文件进行描述。说明,在 idea 中编写 .proto 文件时,会自动提示是否下载 .ptotot 编写插件. 可以让语法高亮。
-
然后通过 protoc.exe 编译器根据.proto 自动生成.java 文件
-
protobuf 使用示意图
(4)Protobuf 快速入门实例
编写程序,使用 Protobuf 完成如下功能
-
客户端可以发送一个 Student PoJo 对象到服务器 (通过 Protobuf 编码)
-
服务端能接收 Student PoJo 对象,并显示信息(通过 Protobuf 解码)
-
具体演示步骤
【1】Student.proto
编译
protoc.exe --java_out=. Student.proto
将生成的 StudentPOJO 放入到项目使用
syntax = "proto3"; //版本
option java_outer_classname = "StudentPOJO";//生成的外部类名,同时也是文件名
//protobuf 使用message 管理数据
message Student { //会在 StudentPOJO 外部类生成一个内部类 Student, 他是真正发送的POJO对象
int32 id = 1; // Student 类中有 一个属性 名字为 id 类型为int32(protobuf类型) 1表示属性序号,不是值
string name = 2;
}
【2】NettyServerHandler
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.util.CharsetUtil;
/*
说明
1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范)
2. 这时我们自定义一个Handler , 才能称为一个handler
*/
//public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public class NettyServerHandler extends SimpleChannelInboundHandler<StudentPOJO.Student> {
//读取数据实际(这里我们可以读取客户端发送的消息)
/*
1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
2. Object msg: 就是客户端发送的数据 默认Object
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, StudentPOJO.Student msg) throws Exception {
//读取从客户端发送的StudentPojo.Student
System.out.println("客户端发送的数据 id=" + msg.getId() + " 名字=" + msg.getName());
}
// //读取数据实际(这里我们可以读取客户端发送的消息)
// /*
// 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
// 2. Object msg: 就是客户端发送的数据 默认Object
// */
// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//
// //读取从客户端发送的StudentPojo.Student
//
// StudentPOJO.Student student = (StudentPOJO.Student) msg;
//
// System.out.println("客户端发送的数据 id=" + student.getId() + " 名字=" + student.getName());
// }
//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush 是 write + flush
//将数据写入到缓存,并刷新
//一般讲,我们对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
}
//处理异常, 一般是需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
【3】NettyServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
public class NettyServer {
public static void main(String[] args) throws Exception {
//创建BossGroup 和 WorkerGroup
//说明
//1. 创建两个线程组 bossGroup 和 workerGroup
//2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成
//3. 两个都是无限循环
//4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
// 默认实际 cpu核数 * 2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来进行设置
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
// .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//在pipeline加入ProtoBufDecoder
//指定对哪种对象进行解码
pipeline.addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
pipeline.addLast(new NettyServerHandler());
}
}); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器
System.out.println(".....服务器 is ready...");
//绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
//启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();
//给cf 注册监听器,监控我们关心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口 6668 成功");
} else {
System.out.println("监听端口 6668 失败");
}
}
});
//对关闭通道进行监听
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
【4】NettyClientHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//发生一个Student 对象到服务器
StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(4).setName("智多星 吴用").build();
//Teacher , Member ,Message
ctx.writeAndFlush(student);
}
//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址: "+ ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
【5】NettyClient
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
public class NettyClient {
public static void main(String[] args) throws Exception {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//在pipeline中加入 ProtoBufEncoder
pipeline.addLast("encoder", new ProtobufEncoder());
pipeline.addLast(new NettyClientHandler()); //加入自己的处理器
}
});
System.out.println("客户端 ok..");
//启动客户端去连接服务器端
//关于 ChannelFuture 要分析,涉及到netty的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
(5) Protobuf 快速入门实例 2
-
编写程序,使用 Protobuf 完成如下功能
-
客户端可以随机发送 Student PoJo/ Worker PoJo 对象到服务器 (通过 Protobuf 编码)
-
服务端能接收 Student PoJo/ Worker PoJo 对象(需要判断是哪种类型),并显示信息(通过 Protobuf 解码)
-
具体演示步骤
【1】MyMessage.proto
syntax = "proto3";
option optimize_for = SPEED; // 加快解析
option java_package="com.atguigu.netty.codec2"; //指定生成到哪个包下
option java_outer_classname="MyDataInfo"; // 外部类名, 文件名
//protobuf 可以使用message 管理其他的message
message MyMessage {
//定义一个枚举类型
enum DataType {
StudentType = 0; //在proto3 要求enum的编号从0开始
WorkerType = 1;
}
//用data_type 来标识传的是哪一个枚举类型
DataType data_type = 1;
//表示每次枚举类型最多只能出现其中的一个, 节省空间
oneof dataBody {
Student student = 2;
Worker worker = 3;
}
}
message Student {
int32 id = 1;//Student类的属性
string name = 2; //
}
message Worker {
string name=1;
int32 age=2;
}
【2】NettyServerHandler
/*
说明
1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范)
2. 这时我们自定义一个Handler , 才能称为一个handler
*/
//public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {
//读取数据实际(这里我们可以读取客户端发送的消息)
/*
1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
2. Object msg: 就是客户端发送的数据 默认Object
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
//根据dataType 来显示不同的信息
MyDataInfo.MyMessage.DataType dataType = msg.getDataType();
if(dataType == MyDataInfo.MyMessage.DataType.StudentType) {
MyDataInfo.Student student = msg.getStudent();
System.out.println("学生id=" + student.getId() + " 学生名字=" + student.getName());
} else if(dataType == MyDataInfo.MyMessage.DataType.WorkerType) {
MyDataInfo.Worker worker = msg.getWorker();
System.out.println("工人的名字=" + worker.getName() + " 年龄=" + worker.getAge());
} else {
System.out.println("传输的类型不正确");
}
}
// //读取数据实际(这里我们可以读取客户端发送的消息)
// /*
// 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
// 2. Object msg: 就是客户端发送的数据 默认Object
// */
// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//
// //读取从客户端发送的StudentPojo.Student
//
// StudentPOJO.Student student = (StudentPOJO.Student) msg;
//
// System.out.println("客户端发送的数据 id=" + student.getId() + " 名字=" + student.getName());
// }
//数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush 是 write + flush
//将数据写入到缓存,并刷新
//一般讲,我们对这个发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
}
//处理异常, 一般是需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
【3】NettyServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
public class NettyServer {
public static void main(String[] args) throws Exception {
//创建BossGroup 和 WorkerGroup
//说明
//1. 创建两个线程组 bossGroup 和 workerGroup
//2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成
//3. 两个都是无限循环
//4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
// 默认实际 cpu核数 * 2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来进行设置
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
// .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//在pipeline加入ProtoBufDecoder
//指定对哪种对象进行解码
pipeline.addLast("decoder", new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()));
pipeline.addLast(new NettyServerHandler());
}
}); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器
System.out.println(".....服务器 is ready...");
//绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
//启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();
//对关闭通道进行监听
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
【4】NettyClientHandler
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import java.util.Random;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//当通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//随机的发送Student 或者 Workder 对象
int random = new Random().nextInt(3);
MyDataInfo.MyMessage myMessage = null;
if(0 == random) { //发送Student 对象
myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.StudentType).setStudent(MyDataInfo.Student.newBuilder().setId(5).setName("玉麒麟 卢俊义").build()).build();
} else { // 发送一个Worker 对象
myMessage = MyDataInfo.MyMessage.newBuilder().setDataType(MyDataInfo.MyMessage.DataType.WorkerType).setWorker(MyDataInfo.Worker.newBuilder().setAge(20).setName("老李").build()).build();
}
ctx.writeAndFlush(myMessage);
}
//当通道有读取事件时,会触发
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址: "+ ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
【5】NettyClient
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
public class NettyClient {
public static void main(String[] args) throws Exception {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//在pipeline中加入 ProtoBufEncoder
pipeline.addLast("encoder", new ProtobufEncoder());
pipeline.addLast(new NettyClientHandler()); //加入自己的处理器
}
});
System.out.println("客户端 ok..");
//启动客户端去连接服务器端
//关于 ChannelFuture 要分析,涉及到netty的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
六、Netty 编解码器和 handler 的调用机制
(1)基本说明
-
netty 的组件设计:Netty 的主要组件有 Channel、EventLoop、ChannelFuture、ChannelHandler、ChannelPipe 等
-
ChannelHandler 充当了处理入站和出站数据的应用程序逻辑的容器。例如,实现 ChannelInboundHandler 接口(或ChannelInboundHandlerAdapter),你就可以接收入站事件和数据,这些数据会被业务逻辑处理。当要给客户端发 送 响 应 时 , 也 可 以 从 ChannelInboundHandler 冲 刷 数 据 。 业 务 逻 辑 通 常 写 在 一 个 或 者 多 个ChannelInboundHandler 中。ChannelOutboundHandler 原理一样,只不过它是用来处理出站数据的
-
ChannelPipeline 提供了 ChannelHandler 链的容器。以客户端应用程序为例,如果事件的运动方向是从客户端到服务端的, 那么我们称这些事件为出站的, 即客户端发送给服务端的数据会通过 pipeline 中的一系列ChannelOutboundHandler,并被这些 Handler 处理,反之则称为入站的
(2)编码解码器
-
当 Netty 发送或者接受一个消息的时候,就将会发生一次数据转换。入站消息会被解码:从字节转换为另一种格式(比如 java 对象);如果是出站消息,它会被编码成字节。
-
Netty 提供一系列实用的编解码器,他们都实现了 ChannelInboundHadnler 或者 ChannelOutboundHandler 接口。在这些类中,channelRead 方法已经被重写了。以入站为例,对于每个从入站 Channel 读取的消息,这个方法会被调用。随后,它将调用由解码器所提供的 decode()方法进行解码,并将已经解码的字节转发给 ChannelPipeline 中的下一个 ChannelInboundHandler。
(3)解码器ByteToMessageDecoder
- 关系继承图
-
由于不可能知道远程节点是否会一次性发送一个完整的信息,tcp 有可能出现粘包拆包的问题,这个类会对入站数据进行缓冲,直到它准备好被处理.
-
一个关于 ByteToMessageDecoder 实例分析
(4)handler链调用机制
实例要求:
- 使用自定义的编码器和解码器来说明 Netty 的 handler 调用机制
客户端发送 long -> 服务器
服务端发送 long -> 客户端
- 案例演示
【1】MyServer
public class MyServer {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); //自定义一个初始化类
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
【2】MyServerInitializer
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();//一会下断点
//入站的handler进行解码 MyByteToLongDecoder
pipeline.addLast(new MyByteToLongDecoder());
// pipeline.addLast(new MyByteToLongDecoder2());
//出站的handler进行编码
pipeline.addLast(new MyLongToByteEncoder());
//自定义的handler 处理业务逻辑
pipeline.addLast(new MyServerHandler());
System.out.println("xx");
}
}
【3】MyServerHandler
public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("从客户端" + ctx.channel().remoteAddress() + " 读取到long " + msg);
//给客户端发送一个long
ctx.writeAndFlush(98765L);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
【4】MyLongToByteEncoder
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
//编码方法
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("MyLongToByteEncoder encode 被调用");
System.out.println("msg=" + msg);
out.writeLong(msg);
}
}
【5】MyByteToLongDecoder
public class MyByteToLongDecoder extends ByteToMessageDecoder {
/**
*
* decode 会根据接收的数据,被调用多次, 直到确定没有新的元素被添加到list
* , 或者是ByteBuf 没有更多的可读字节为止
* 如果list out 不为空,就会将list的内容传递给下一个 channelinboundhandler处理, 该处理器的方法也会被调用多次
*
* @param ctx 上下文对象
* @param in 入站的 ByteBuf
* @param out List 集合,将解码后的数据传给下一个handler
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder 被调用");
//因为 long 8个字节, 需要判断有8个字节,才能读取一个long
if(in.readableBytes() >= 8) {
out.add(in.readLong());
}
}
}
【6】MyClient
public class MyClient {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new MyClientInitializer()); //自定义一个初始化类
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
【7】MyClientInitializer
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//加入一个出站的handler 对数据进行一个编码
pipeline.addLast(new MyLongToByteEncoder());
//这时一个入站的解码器(入站handler )
pipeline.addLast(new MyByteToLongDecoder());
// pipeline.addLast(new MyByteToLongDecoder2());
//加入一个自定义的handler , 处理业务
pipeline.addLast(new MyClientHandler());
}
}
【8】MyClientHandler
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println("服务器的ip=" + ctx.channel().remoteAddress());
System.out.println("收到服务器消息=" + msg);
}
//重写channelActive 发送数据
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("MyClientHandler 发送数据");
//ctx.writeAndFlush(Unpooled.copiedBuffer(""))
ctx.writeAndFlush(123456L); //发送的是一个long
//分析
//1. "abcdabcdabcdabcd" 是 16个字节
//2. 该处理器的前一个handler 是 MyLongToByteEncoder
//3. MyLongToByteEncoder 父类 MessageToByteEncoder
//4. 父类 MessageToByteEncoder
/*
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
if (acceptOutboundMessage(msg)) { //判断当前msg 是不是应该处理的类型,如果是就处理,不是就跳过encode
@SuppressWarnings("unchecked")
I cast = (I) msg;
buf = allocateBuffer(ctx, cast, preferDirect);
try {
encode(ctx, cast, buf);
} finally {
ReferenceCountUtil.release(cast);
}
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
}
4. 因此我们编写 Encoder 是要注意传入的数据类型和处理的数据类型一致
*/
// ctx.writeAndFlush(Unpooled.copiedBuffer("abcdabcdabcdabcd",CharsetUtil.UTF_8));
}
}
- 结论
不论解码器 handler 还是 编码器 handler 即接收的消息类型必须与待处理的消息类型一致,否则该 handler 不会被执行;在解码器 进行数据解码时,需要判断 缓存区(ByteBuf)的数据是否足够 ,否则接收到的结果会期望结果可能不一致。
(5)解码器-ReplayingDecoder
-
public abstract class ReplayingDecoder
extends ByteToMessageDecoder -
ReplayingDecoder 扩展了 ByteToMessageDecoder 类,使用这个类,我们不必调用 readableBytes()方法。参数 S指定了用户状态管理的类型,其中 Void 代表不需要状态管理
-
应用实例:使用 ReplayingDecoder 编写解码器,对前面的案例进行简化 [案例演示]
public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyByteToLongDecoder2 被调用");
//在 ReplayingDecoder 不需要判断数据是否足够读取,内部会进行处理判断
out.add(in.readLong());
}
}
- ReplayingDecoder 使用方便,但它也有一些局限性:
-
并不是所有的 ByteBuf 操作都被支持, 如果调用了一个不被支持的方法,将会抛出一个UnsupportedOperationException。
-
ReplayingDecoder 在某些情况下可能稍慢于 ByteToMessageDecoder,例如网络缓慢并且消息格式复杂时, 消息会被拆成了多个碎片,速度变慢
(6)其它编解码器
【1】其它解码器
-
LineBasedFrameDecoder:这个类在 Netty 内部也有使用,它使用行尾控制字符(n 或者rn)作为分隔符来解析数据。
-
DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符。
-
HttpObjectDecoder:一个 HTTP 数据的解码器
-
LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息。
【2】 其它编码器
七、Log4j 整合到Netty
(1)在 Maven 中添加对 Log4j 的依赖 在 pom.xml
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
(2)配置 Log4j , 在 resources/log4j.properties
log4j.rootLogger=DEBUG, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%p] %C{1} - %m%n
(3) 演示整合
八、TCP粘包和拆包及解决方案
(1)TCP 粘包和拆包基本介绍
-
TCP 是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的 socket, 因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle 算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界的
-
由于 TCP 无消息保护边界, 需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题, 看一张图
-
示意图 TCP 粘包、拆包图解
对图的说明:
假设客户端分别发送了两个数据包 D1 和 D2 给服务端,由于服务端一次读取到字节数是不确定的,故可能存在以下四种情况:
-
服务端分两次读取到了两个独立的数据包,分别是 D1 和 D2,没有粘包和拆包
-
服务端一次接受到了两个数据包,D1 和 D2 粘合在一起,称之为 TCP 粘包
服务端分两次读取到了数据包,第一次读取到了完整的 D1 包和 D2 包的部分内容,第二次读取到了 D2 包的剩余内容,这称之为 TCP 拆包
- 服务端分两次读取到了数据包,第一次读取到了 D1 包的部分内容 D1_1,第二次读取到了 D1 包的剩余部分内容 D1_2 和完整的 D2 包。
(2)TCP 粘包和拆包现象实例
在编写 Netty程序时,如果没有做处理,就会发生粘包和拆包的问题:
【1】MyServer
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class MyServer {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); //自定义一个初始化类
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
【2】MyClientInitializer
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyClientHandler());
}
}
【3】MyServerHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.Charset;
import java.util.UUID;
public class MyServerHandler extends SimpleChannelInboundHandler<ByteBuf>{
private int count;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);
//将buffer转成字符串
String message = new String(buffer, Charset.forName("utf-8"));
System.out.println("服务器接收到数据 " + message);
System.out.println("服务器接收到消息量=" + (++this.count));
//服务器回送数据给客户端, 回送一个随机id ,
ByteBuf responseByteBuf = Unpooled.copiedBuffer(UUID.randomUUID().toString() + " ", Charset.forName("utf-8"));
ctx.writeAndFlush(responseByteBuf);
}
}
【4】MyClient
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyClient {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new MyClientInitializer()); //自定义一个初始化类
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
【5】MyClientInitializer
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyClientHandler());
}
}
【6】MyClientHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.nio.charset.Charset;
public class MyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//使用客户端发送10条数据 hello,server 编号
for(int i= 0; i< 10; ++i) {
ByteBuf buffer = Unpooled.copiedBuffer("hello,server " + i, Charset.forName("utf-8"));
ctx.writeAndFlush(buffer);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
byte[] buffer = new byte[msg.readableBytes()];
msg.readBytes(buffer);
String message = new String(buffer, Charset.forName("utf-8"));
System.out.println("客户端接收到消息=" + message);
System.out.println("客户端接收消息数量=" + (++this.count));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
结果,服务器收到的消息数量=7不是10说明有拆包
(3)TCP 粘包和拆包解决方案
-
使用自定义协议 + 编解码器 来解决
-
关键就是要解决 服务器端每次读取数据长度的问题, 这个问题解决,就不会出现服务器多读或少读数据的问题,从而避免的 TCP 粘包、拆包 。
看一个具体的实例:
-
要求客户端发送 5 个 Message 对象, 客户端每次发送一个 Message 对象
-
服务器端每次接收一个 Message, 分 5 次进行解码, 每读取到 一个 Message , 会回复一个 Message 对象 给客户端
- 代码演示
【1】MyServer
public class MyServer {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).childHandler(new MyServerInitializer()); //自定义一个初始化类
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
【2】MyServerInitializer
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyMessageDecoder());//解码器
pipeline.addLast(new MyMessageEncoder());//编码器
pipeline.addLast(new MyServerHandler());
}
}
【3】MyServerHandler
//处理业务的handler
public class MyServerHandler extends SimpleChannelInboundHandler<MessageProtocol>{
private int count;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
//接收到数据,并处理
int len = msg.getLen();
byte[] content = msg.getContent();
System.out.println();
System.out.println();
System.out.println();
System.out.println("服务器接收到信息如下");
System.out.println("长度=" + len);
System.out.println("内容=" + new String(content, Charset.forName("utf-8")));
System.out.println("服务器接收到消息包数量=" + (++this.count));
//回复消息
String responseContent = UUID.randomUUID().toString();
int responseLen = responseContent.getBytes("utf-8").length;
byte[] responseContent2 = responseContent.getBytes("utf-8");
//构建一个协议包
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(responseLen);
messageProtocol.setContent(responseContent2);
ctx.writeAndFlush(messageProtocol);
}
}
【4】MyClient
public class MyClient {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.handler(new MyClientInitializer()); //自定义一个初始化类
ChannelFuture channelFuture = bootstrap.connect("localhost", 7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
【5】MyClientInitializer
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MyMessageEncoder()); //加入编码器
pipeline.addLast(new MyMessageDecoder()); //加入解码器
pipeline.addLast(new MyClientHandler());
}
}
【6】MyClientHandler
public class MyClientHandler extends SimpleChannelInboundHandler<MessageProtocol> {
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//使用客户端发送10条数据 "今天天气冷,吃火锅" 编号
for(int i = 0; i< 5; i++) {
String mes = "今天天气冷,吃火锅";
byte[] content = mes.getBytes(Charset.forName("utf-8"));
int length = mes.getBytes(Charset.forName("utf-8")).length;
//创建协议包对象
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
ctx.writeAndFlush(messageProtocol);
}
}
// @Override
protected void channelRead0(ChannelHandlerContext ctx, MessageProtocol msg) throws Exception {
int len = msg.getLen();
byte[] content = msg.getContent();
System.out.println("客户端接收到消息如下");
System.out.println("长度=" + len);
System.out.println("内容=" + new String(content, Charset.forName("utf-8")));
System.out.println("客户端接收消息数量=" + (++this.count));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常消息=" + cause.getMessage());
ctx.close();
}
}
【7】MessageProtocol
//协议包
public class MessageProtocol {
private int len; //关键
private byte[] content;
public int getLen() {
return len;
}
public void setLen(int len) {
this.len = len;
}
public byte[] getContent() {
return content;
}
public void setContent(byte[] content) {
this.content = content;
}
}
【8】MyMessageDecoder
public class MyMessageDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyMessageDecoder decode 被调用");
//需要将得到二进制字节码-> MessageProtocol 数据包(对象)
int length = in.readInt();
byte[] content = new byte[length];
in.readBytes(content);
//封装成 MessageProtocol 对象,放入 out, 传递下一个handler业务处理
MessageProtocol messageProtocol = new MessageProtocol();
messageProtocol.setLen(length);
messageProtocol.setContent(content);
out.add(messageProtocol);
}
}
【9】MyMessageEncoder
public class MyMessageEncoder extends MessageToByteEncoder<MessageProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MessageProtocol msg, ByteBuf out) throws Exception {
System.out.println("MyMessageEncoder encode 方法被调用");
out.writeInt(msg.getLen());
out.writeBytes(msg.getContent());
}
}
最后
以上就是稳重微笑为你收集整理的netty进阶的全部内容,希望文章能够帮你解决netty进阶所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复