概述
为了整明白netty源码花了将近一个月的时间。首先掌握基础知识:操作系统的网络IO、TCP传输协议、socket是什么、网络IO模型、线程池中线程的执行原理、线程异步执行等一系列学习netty源码之前都要掌握的知识。
netty源码
服务端流程:
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
netty启动主线程和boss线程的初始化流程
1、new NioEventLoopGroup(1):初始化一个创建线程的线程工厂(ThreadPerTaskExecutor对象持有,一个NioEventLoopGroup对应一个ThreadPerTaskExecutor对象);创建NioEventLoop对象,该对象是被reactor模型中处理客户端连接的bossGroup(注意这里只会创建一个线程)所持有,它包装了SelectorProvider、Selector、executor执行器(ThreadPerTaskExecutor对象),taskQueue(任务队列Queue<Runnable>),线程对象thread(此时尚未创建,记住这里很重要)。NioEventLoopGroup持有一个NioEventLoop类型的数组对象。
2、new NioEventLoopGroup():创建流程跟 1 一样,只不过这里会创建多个NioEventLoop对象,具体多少个,netty会根据机器的核心处理器计算得到。这些NioEventLoop对象可以称为workerGroup。
3、b = new ServerBootstrap():创建ServerBootstrap对象,它继承了AbstractBootstrap
4、b.group(bossGroup, workerGroup):ServerBootstrap对象通过group()维护住bossGroup和workerGroup,bossGroup是ServerBootstrap父类AbstractBootstrap的group属性维护的,workerGroup是ServerBootstrap对象的childGroup
5、channel(NioServerSocketChannel.class):创建ReflectiveChannelFactory对象赋值给ServerBootstrap对象的channelFactory属性(该属性通过父类维护),而ReflectiveChannelFactory对象通过获取NioServerSocketChannel.class的构造方法维护构造器constructor的属性。NioServerSocketChannel对象在创建时会绑定SelectionKey.OP_ACCEPT事件。
6、option(ChannelOption.SO_BACKLOG, 100):option表示可选,这里是给tcp设置调优的参数。给ServerBootstrap对象的options属性赋值(父类维护)
7、handler(new LoggingHandler(LogLevel.INFO)):给ServerBootstrap对象handler属性赋值LoggingHandler对象。(该属性通过父类维护)
8、childHandler(ChannelInitializer):给ServerBootstrap对象的childHandler属性赋值。
通过步骤3~8对ServerBootstrap对象的创建和属性赋值就完成了。
9、b.bind(int):bind()看似绑定端口操作,实际上netty在这里做了很多重要的操作,例如reactor线程模型、异步执行handler等就是这里实现的。最为核心的方法initAndRegister():
(1)channel=channelFactory.newChannel():通过步骤5维护的NioServerSocketChannel Class对象的构造器来反射创建NioServerSocketChannel对象,这个过程中会创建jdk原生ServerSocketChannel类的对象并将其赋值给它的属性ch,同时ch置为非阻塞模式;接下里会维护对连接事件感兴趣的属性readInterestOp,以及创建DefaultChannelPipeline对象赋值给pipeline属性,pipeline对象也维护了ServerSocketChannel对象,创建pipeline对象过程中会构建一个ChannelHandler类型(也是AbstractChannelHandlerContex类型)的单向链表,头尾属性分别是head和tail。
(2)init(NioServerSocketChannel):调用ServerBootstrap#init()方法对ServerBootstrap对象进行初始化。首先通过ch拿到pipeline,给pipeline对象添加匿名内部类ChannelInitializer对象,ChannelInitializer这里只实现initChannel(Channel)方法,ChannelInitializer最终会被包装为DefaultChannelHandlerContext添加到pipeline的单向链表中。
(3)config().group().register(channel):实际上执行的是group.register(NioServerSocketChannel),将group对象赋值给NioServerSocketChannel对象的属性group,将他们进行绑定。到目前为止执行的线程一直是main线程,接下来main线程使用group.execute(Runnable1)-->addTask(Runnable1)将task=Runnable1丢进taskQueue队列-->doStartThread()-->executor.execute(Runnable2)使用ThreadPerTaskExecutor对象持有的线程工厂创建一个线程绑定在group对象中某一个NioEventLoop对象的thread属性(如果有多个NioEventLoop对象这里轮询获取,当前是bossGroup只有一个),暂且将这个线程称为boss线程。至此,主线程所做的核心处理基本结束,调用await()方法进入WAITING状态。boss线程开始执行Runnable2并调用NioEventLoop#run()进入for(;;)循环轮询selector选择器,在此期间会将之前丢进任务队列中的task拉取出来执行,也就是Runnable1,Runnable1只会调用register0()方法。
######## boss线程执行register0()的流程 #######
doRegister():将NioServerSocketChannel对象注册到bossGroup的NioEventLoop对象的selector上,但此时并没有注册感兴趣的连接事件,只是将SelectionKey.ops置为0。
pipeline.invokeHandlerAddedIfNeeded():执行(2)pipeline中ChannelInitializer#initChannel(ch)方法,它会继续向taskQueue添加一个Runnable3,这个任务也不会立即执行。随后将当前ChannelInitializer从pipeline中移除。
safeSetSuccess(promise):selectorKey会真正绑定连接事件。这里会经过三次taskQueue的任务入队出队操作,依次调用的核心方法doBind0(),invokeBind(),invokeRead()最后绑定连接事件。
pipeline.fireChannelRegistered():调用pipeline中从HeadContext到TailContext结束的所有channelRegistered()方法
---------从这里开始当有第一个客户端连接过来才会执行的方法(worker1线程)--------
pipeline.fireChannelActive():调用pipeline中从HeadContext到TailContext结束的所有channelActive()方法,其中headContext中channelActive()会调用readIfIsAutoRead()把NioSocketChannel注册到workerGroup中第一个NioEventLoop的selector,SelectionKey对读事件感兴趣。
beginRead():该方法由workerGroup中第一个NioEventLoop中绑定的线程执行,这个线程是由boss线程创建的。它负责开始读取数据。
----------客户端连接处理结束----------
####### boss线程执行register0()的流程结束 #######
任务队列taskQueue第一个任务执行结束后,boss线程重新从taskQueue中拉取任务,此时获取的任务是执行第一个task时添加到Runnable3任务,执行该任务创建一个ServerBootstrapAcceptor对象添加到pipeline中,所以ServerBootstrapAcceptor也是一个ChannelHandler处理器,ServerBootstrapAcceptor对象维护了childGroup(workerGroup),childHandler,enableAutoReadTask(是一个任务引用???)。
至此boss线程初始化的工作已完成,真正开始轮询selector选择器,等待客户端的连接。
boss线程执行第一个客户端与netty服务建立连接流程
当第一个客户端向netty服务发起建立连接时,boss线程接收到连接processSelectedKeys()方法处理连接。processSelectedKeys()处理如下
unsafe.read():处理连接事件执行的是NioMessageUnsafe#read(),创建NioSocketChannel对象并绑定其来源于的NioServerSocketChannel对象和SelectionKey.OP_READ读事件。
pipeline.fireChannelRead(NioSocketChannel):依次调用pipeline中从HeadContext开始直到TailContext()结束的channelRead()方法。期间会执行ServerBootstrapAcceptor#channelRead()方法,它会调用child.pipeline().addLast(childHandler);将childHandler添加到child(上面创建的NioSocketChannel)的pipeline中;接下来执行childGroup.register(child),将Runnable(register0())任务通过addTask(task)丢进任务队列中,并创建第一个workerGroup关联的线程,暂且命名为worker1线程。
最后调用自己绑定的pipeline.fireChannelReadComplete();重新轮询selector选择器,等待下一次连接事件
#########worker1线程#######
worker1线程重新执行register0()的逻辑,主要是workerGroup中第一个NioEventLoop的selector注册读事件。具体参照上面分析的register0()方法。
worker1执行完register0()后,重新轮询自己绑定的selector,此时会监听到一个读事件,依然会调用processSelectedKeys()处理读事件:
unsafe.read():处理读事件执行的是NioByteUnsafe#read(),首先将channel数据读到byteBuf中,执行fireChannelRead(byteBuf),依次调用绑定的pipeline中所有handler#channelRead(byteBuf)方法对数据进行处理,也就是业务添加的childHandler#channelRead(),正式对客户端发过来的数据进行处理。
pipeline.fireChannelReadComplete():执行pipeline中所有handler#channelReadComplete()方法
worker1数据读取完成,重新开始轮询selector选择,监听下一次boss线程传递的连接事件。
#########worker1线程#######
Netty线程模型
流程图
最后附上花了近一个月整理的netty流程图,祝大家好运
Netty重点问题的解决
TCP粘包/拆包
明确什么是TCP粘包拆包?
TCP是面向流的,它无法理解上层业务的含义,它会根据TCP缓冲区(随机大小,根据对端的接收情况动态调整)的实际情况进行包的划分,所以TCP可能会把业务上认为的一个完整包拆分成多个包,也可能会把业务上多个包合成一个大的数据包向对端发送,这就是TCP粘包拆包。
主要分为以下几种解决方法:
消息定长:每个报文固定大小,不够的进行空格填充
分隔符:在包尾部增加特殊分隔符进行区分
将消息分为消息头和消息体:消息头标识消息的总长度(或者消息体的长度)。
netty使用LineBasedFrameDecoder+StringDecoder来解决TCP粘包拆包问题,LineBasedFrameDecoder的原理是判断是否有换行符("n"或"rn"),如果有就表示结束,以换行符为结束符的解码器。StringDecoder就是将接收到的对象转换为字符串。
netty还提供了DelimiterBasedFrameDecoder特殊分隔符的解码器,允许自定义分隔符;FixedLengthFrameDecoder固定长度的解码器。
序列化
java序列化,实现接口;
Protobuf
Jboss Marshalling
netty零拷贝
前面在NIO的博客中已经分析了什么是零拷贝,简单来说就是减少数据在内核态和用户态之间的拷贝,以及减少上下文的切换次数从而提高读写数据的效率,因为是从操作系统层面的用户态和内核态分析的,总的来说比较抽象。今天从netty源码层面看看零拷贝到底是如何实现。
netty使用ByteBuf做为数据读写的缓冲区,如netty server在读取数据时使用NioByteUnsafe#read()方法:
接下来调用的是 AbstractByteBufAllocator#ioBuffer(int, int):从方法名就体现出了直接内存和jvm堆内内存,通过创建直接内存并持有数据的引用来替代数据从内核态到用户态的拷贝。
继续看directBuffer是如何操作的
继续查看的实现PooledByteBufAllocator#newDirectBuffer(int, int)
最终调用 PooledUnsafeDirectByteBuf.newInstance(int)循环使用ByteBuf对象
debug netty默认的ByteBuf在里面只能找到一个指向数据内存区域的地址引用,找不到具体数据
重新创建一个heapmemory,可以看到里面存储了byte数组,它承载了数据
最后
以上就是大气招牌为你收集整理的Netty源码吐血总结的全部内容,希望文章能够帮你解决Netty源码吐血总结所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复