概述
Netty服务器启动源码剖析
文章目录
- Netty服务器启动源码剖析
- 1、Netty服务器启动源码剖析
- 1.1、执行new NioEventLoopGroup()时发生了什么
- 1.1.1、NioEventLoopGroup 成分
- 1.1.2、追踪 new NioEventLoopGroup()
- 1.1.3、总结
- 1.2、引导类ServerBootstrap的创建与配置
- 1.2.1、ServerBootstrap 的成分
- 1.2.2、ServerBootstrap 的配置
- 1.2.3、总结
- 1.3、执行ServerBootstrap.bind(PORT)时发生了什么
- 1.3.1、doBind 源码
- 1.3.2、run 死循环
- 1.3.3、总结
1、Netty服务器启动源码剖析
1.1、执行new NioEventLoopGroup()时发生了什么
本次分析创建workerGroup的过程(创建 bossGroup 的过程同理):
/** * EventLoopGroup 是一个线程组,其中的每一个线程都在循环执行着三件事情: * select:轮询注册在其中的 Selector 上的 Channel 的 IO 事件 * processSelectedKeys:在对应的 Channel 上处理 IO 事件 * runAllTasks:再去以此循环处理任务队列中的其他任务 */ EventLoopGroup workGroup = new NioEventLoopGroup();
1.1.1、NioEventLoopGroup 成分
Debug查看 workerGroup 的“成分”,可以看到它包含 8(cpu核数*2) 个 NioEventLoop,每个 NioEventLoop 里面有选择器、任务队列、执行器等等:
NioEventLoop继承关系图:
1.1.2、追踪 new NioEventLoopGroup()
追踪 new NioEventLoopGroup() 的底层调用:
(1)new NioEventLoopGroup()
红色框圈住的构造方法的源码为:
/** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. * @param executor the Executor to use, or {@code null} if the default should be used. * @param chooserFactory the {@link EventExecutorChooserFactory} to use. * @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } // 这里的 ThreadPerTaskExecutor 实例是下文用于创建 EventExecutor 实例的参数 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); // ThreadPerTaskExecutor 的源代码如下,它的功能是从线程工厂中获取线程来执行 command //public final class ThreadPerTaskExecutor implements Executor { // private final ThreadFactory threadFactory; // // public ThreadPerTaskExecutor(ThreadFactory threadFactory) { // if (threadFactory == null) { // throw new NullPointerException("threadFactory"); // } // this.threadFactory = threadFactory; // } // // @Override // public void execute(Runnable command) { // threadFactory.newThread(command).start(); // } //} } // 这里定义了一个容量为 nThreads 的 EventExecutor 的数组 children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // 往 EventExecutor 数组中添加元素 children[i] = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { // 添加元素失败,则 shutdown 每一个 EventExecutor for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } // chooser 的作用是为了实现 next()方法,即从 group 中挑选一个 NioEventLoop 来处理连接上 IO 事件的方法 chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { // EventExecutor 的终止事件回调方法 @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { // 通过本类中定义的 Promise 属性的.setSuccess()方法设置结果,所有的监听者可以拿到该结果 terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { // 为每一个 EventExecutor 添加终止事件监听器 e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }
(2):newChild(executor, args) 方法,主要关注 NioEventLoop 中的选择器、任务队列、执行器等成分是从哪来的。
这里的 newChild() 方法包含了构建每一个 NioEventLoop 的细节,可以看到,newChild()调用了 NioEventLoop 的构造函数来构建每一个 NioEventLoop 实例。
执行器(executor)
调用 NioEventLoop 的构造函数的时候,传入的参数 parent 为上一层调用者,executor 为 ThreadPerTaskExecutor 的实例。上文的代码注释已经讲明了其来源和功能,如下:
// 这里的 ThreadPerTaskExecutor 实例是下文用于创建 EventExecutor 实例的参数 if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); // ThreadPerTaskExecutor 的源代码如下,它的功能是从线程工厂中获取线程来执行 command //public final class ThreadPerTaskExecutor implements Executor { // private final ThreadFactory threadFactory; // // public ThreadPerTaskExecutor(ThreadFactory threadFactory) { // if (threadFactory == null) { // throw new NullPointerException("threadFactory"); // } // this.threadFactory = threadFactory; // } // // @Override // public void execute(Runnable command) { // threadFactory.newThread(command).start(); // } //} }
选择器(selector)
NioEventLoop 的构造方法中有一个 openSelector(),它完成了选择器(多路复用器)的初始化。其中 provider(源码追踪图1的step-3)、selectStrategy(源码追踪图1的step-4) 由上一层传入。
openSelector() 源码分析
private SelectorTuple openSelector() { final Selector unwrappedSelector; try { // 通过往下追踪发现 provider.openSelector()最终调用了 WindowsSelectorImpl 类的构造方法构造出一个 Selector,因此 unwrappedSelector 是 WindowsSelectorImpl 的实例 unwrappedSelector = provider.openSelector(); //public class WindowsSelectorProvider extends SelectorProviderImpl { // public WindowsSelectorProvider() { // } // // public AbstractSelector openSelector() throws IOException { // return new WindowsSelectorImpl(this); // } //} } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } // Netty 对 NIO 的 Selector 的 selectedKeys 进行了优化(默认设置),用户可以通过 io.netty.noKeySetOptimization 开关决定是否启用该优化项。 // 常量 DISABLE_KEY_SET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false); if (DISABLE_KEY_SET_OPTIMIZATION) { // 若没有开启 selectedKeys 优化,直接返回 return new SelectorTuple(unwrappedSelector); // SelectorTuple(Selector unwrappedSelector) { // this.unwrappedSelector = unwrappedSelector; // this.selector = unwrappedSelector; // } } // 若开启 selectedKeys 优化,需要通过反射的方式从 Selector 实例中获取 selectedKeys 和 publicSelectedKeys,将上述两个成员变量置为可写,然后通过反射的方式使用 Netty 构造的 selectedKeys 包装类selectedKeySet 将原 JDK 的 selectedKeys 替换掉。 Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } }); if (!(maybeSelectorImplClass instanceof Class) || // ensure the current selector implementation is what we can instrument. !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } return new SelectorTuple(unwrappedSelector); } final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet. // This allows us to also do this in Java9+ without any extra flags. long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField); if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) { PlatformDependent.putObject( unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); PlatformDependent.putObject( unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); return null; } // We could not retrieve the offset, lets try reflection as last-resort. } Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) { return cause; } cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); if (cause != null) { return cause; } selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } } }); if (maybeException instanceof Exception) { selectedKeys = null; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e); return new SelectorTuple(unwrappedSelector); } selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector); return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); }
任务队列(taskQueue)
在 NioEventLoop 的构造器中,通过 rejectedExecutionHandler、queueFactory 构造任务队列,newTaskQueue()根据参数 queueFactory 产生 Queue的实例。其中 rejectedExecutionHandler (源码追踪图1的step-5)由上一层传入,queueFactory可以自定义传入,否则为空。
追踪 super 方法
- 看到 newTaskQueue()根据参数 queueFactory 产生的 Queue实例最终被赋值给了 SingleThreadEventExecutor 的 taskQueue 属性,taskQueue 是 SingleThreadEventExecutor 中的任务队列,而 NioEventLoop 又继承于 SingleThreadEventExecutor,因此 NioEventLoop 也就具有这个任务队列了。
- 同理,NioEventLoop 中的定时任务队列 scheduledTaskQueue 也是这么得到的:AbstractScheduledEventExecutor 包含 scheduledTaskQueue 属性,NioEventLoop 又继承于 AbstractScheduledEventExecutor,构造 NioEventLoop 的时候初始化这个 scheduledTaskQueue,因此 NioEventLoop 就有了定时任务队列。
1.1.3、总结
(1)NioEventLoopGroup 的无参数构造函数会调用 NioEventLoopGroup 的有参数构造函数,最终把下面的参数传递给父类 MultithreadEventLoopGroup 的有参数构造函数。
nThreads=cpu核数*2 executor=null chooserFactory=DefaultEventExecutorChooserFactory.INSTANCE selectorProvider=SelectorProvider.provider() selectStrategyFactory=DefaultSelectStrategyFactory.INSTANCE rejectedExecutionHandler=RejectedExecutionHandlers.reject()
(2)父类 MultithreadEventLoopGroup 的有参数构造函数创建一个 NioEventLoop 的容器 children = new EventExecutor[nThreads],并构建出 若干个 NioEventLoop 的实例放入其中。
(3)构建每一个 NioEventLoop 调用的是 children[i] = newChild(executor, args)。
(4)newChild()方法最终调用了 NioEventLoop 的构造函数,初始化其中的选择器、任务队列、执行器等成分。
(5)本节只详述了 NioEventLoop 中选择器、任务队列、执行器三个成分的用途和由来,对于其他成分,可按照本节的代码追踪思路继续探究。
1.2、引导类ServerBootstrap的创建与配置
本节一起看下服务端启动类 ServerBootstrap 的创建与配置代码背后的逻辑。
ServerBootstrap serverBootstrap = new ServerBootstrap(); // 设置线程组 serverBootstrap.group(bossGroup, workerGroup) // 说明服务器端通道的实现类(便于 Netty 做反射处理) .channel(NioServerSocketChannel.class) // 临时存放已完成三次握手的请求的队列的最大长度。 .option(ChannelOption.SO_BACKLOG, 100) // 对服务端的 NioServerSocketChannel 添加 Handler // LoggingHandler 是 netty 内置的一种 ChannelDuplexHandler,既可以处理出站事件,又可以处理入站事件,即 LoggingHandler 既记录出站日志又记录入站日志。 .handler(new LoggingHandler(LogLevel.INFO)) // 对服务端接收到的、与客户端之间建立的 SocketChannel 添加 Handler .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { // sslCtx.newHandler(ch.alloc())对传输的内容做安全加密处理 p.addLast(sslCtx.newHandler(ch.alloc())); } // 如果需要的话,可以用 LoggingHandler 记录与客户端之间的通信日志 // p.addLast(new LoggingHandler(LogLevel.INFO)); // 业务 serverHandler p.addLast(serverHandler); } });
ServerBootstrap 提供了一系列的链式配置方法,具体而言就是 ServerBootstrap 对象的一个配置方法(比如.group())处理完配置参数之后,会将当前 ServerBootstrap 对象返回,这样就能紧随其后继续调用该对象的其他配置方法(比如.channel())。这是面向对象语言中常见的一种编程模式。
1.2.1、ServerBootstrap 的成分
此时 ServerBootstrap 刚刚被创建,且未进行设置。它包含一个 ServerBootstrapConfig 对象,而这个对象又引用了 ServerBootstrap 对象,因此两个是互相引用、互相包含的关系。此外还包含了 group、handler、childGroup、childHandler 等成分,目前这些成分都为 null,后面进行的各种设置就是为这些成分赋值。
ServerBootstrap 和 Bootstrap 一样,都继承于抽象类 AbstractBootstrap。因此两者具备很多相同的属性和 API,例如 group、channelFactory、localAddress、options、attrs、handler、channel()、channelFactory()、register()、bind()等等。
1.2.2、ServerBootstrap 的配置
.group(bossGroup, workerGroup):作用是把 bossGroup 和 workerGroup 两个参数赋值给 ServerBootstrap 的成员变量 group(从父类 AbstractBootstrap 继承而来)和 childGroup。
.channel(NioServerSocketChannel.class):作用是通过反射机制给当前 ServerBootstrap 中的 channelFactory 属性(从父类 AbstractBootstrap 继承而来)赋值。
- 服务端的 NioServerSocketChannel 实例就是通过这个 channelFactory 创建的,不过现在还没有开始创建,要等到后面调用.bind()的时候才会创建。
.option(ChannelOption.XXX, YYY):作用是将可选项放入一个 options 集合中(给 NioServerSocketChannel 使用)。
.childOption(ChannelOption.XXX, YYY):作用是将可选项放入一个 childOptions 集合中(给 NioSocketChannel 使用)。
.handler(ChannelHandler handler):作用是将某个 Handler 赋值给 ServerBootstrap 实例的 handler 属性(从父类 AbstractBootstrap 继承而来)。
- 这个 handler 最终在.bind() 的时候,在 ServerBootstrap.init() 方法中被放入 NioServerSocketChannel 实例的 pipeline 中。
.childHandler(ChannelHandler handler):作用是为接收客户端连接请求产生的 NioSocketChannel 实例的 pipeline 添加 Handler。
ChannelInitializer 本质是 ChannelHandler,通过重写 initChannel(SocketChannel ch) 方法,为接收客户端连接请求产生的 NioSocketChannel 实例的 pipeline 添加 Handler。
.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { // sslCtx.newHandler(ch.alloc())对传输的内容做安全加密处理 p.addLast(sslCtx.newHandler(ch.alloc())); } // 如果需要的话,可以用 LoggingHandler 记录与客户端之间的通信日志 // p.addLast(new LoggingHandler(LogLevel.INFO)); // 业务 serverHandler p.addLast(serverHandler); } })
p.addLast(serverHandler):调用了 Pipeline 的 addLast 方法向 Pipeline 中的双向链表添加 ChannelHandlerContext 元素:
1.2.3、总结
- (1).group(bossGroup, workerGroup)把 bossGroup 和 workerGroup 两个参数赋值给 ServerBootstrap 的成员变量 group(从父类 AbstractBootstrap 继承而来)和 childGroup。
- (2).channel(NioServerSocketChannel.class)通过反射机制给当前 ServerBootstrap 中的 channelFactory 属性(从父类 AbstractBootstrap 继承而来)赋值。在调用.bind()的时候 channelFactory 会创建 NioServerSocketChannel 的实例。
- (3).option(ChannelOption.XXX, YYY) 将可选项放入一个 options 集合中(给 NioServerSocketChannel 使用)。
- (4).childOption(ChannelOption.XXX, YYY) 将可选项放入一个 childOptions 集合中(给 NioSocketChannel 使用)。
- (5).handler(ChannelHandler handler) 将某个 Handler 赋值给 ServerBootstrap 实例的 handler 属性(从父类 AbstractBootstrap 继承而来)。这个 handler 最终在.bind() 的时候,在 ServerBootstrap.init() 方法中被放入 NioServerSocketChannel 实例的 pipeline 中。
- (6).childHandler(ChannelHandler handler) 参数通常使用ChannelInitializer,其本质是 ChannelHandler,通过重写 initChannel(SocketChannel ch) 方法,为接收客户端连接请求产生的 NioSocketChannel 实例的 pipeline 添加 Handler。
1.3、执行ServerBootstrap.bind(PORT)时发生了什么
本节介绍的 bind(PORT) ,实质是调用 AbstractBootstrap 的 doBind(final SocketAddress localAddress) 方法。
1.3.1、doBind 源码
private ChannelFuture doBind(final SocketAddress localAddress) 源码:
PS: 主要关注 initAndRegister() 和 doBind0(regFuture, channel, localAddress, promise)。
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister();// (1) 初始化 NioServerSocketChannel 的实例,并且将其注册到 bossGroup 中的 EvenLoop 中的 Selector 中 final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // 若异步过程 initAndRegister()已经执行完毕,则进入该分支 // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise);//(2) 调用底层 JDK 接口完成端口绑定和监听 return promise; } else { // 若异步过程 initAndRegister()还未执行完毕,则进入该分支 // Registration future is almost always fulfilled already, but just in case it's not. final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel); // 监听 regFuture 的完成事件,完成之后再调用 doBind0(regFuture, channel, localAddress, promise); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
(1)initAndRegister() 源码:
PS: 主要关注 newChannel() 、init(channel)、register(channel) 方法。
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel();//(1-1) 创建 NioServerSocketChannel 实例 init(channel);//(1-2) 对该 NioServerSocketChannel 进行初始化 } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel);//(1-3) 最终把 NioServerSocketChannel 实例注册到 bossGroup 中 EventLoop 中的 Selector 上。 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
(1-1) newChannel() 源码追踪:
- 前面介绍 ServerBootstrap 的配置.channel(NioServerSocketChannel.class) 已经说明了 channelFactory 的作用。
- newChannel() 实质是 ReflectiveChannelFactory 通过反射创建 NioServerSocketChannel 实例。
- 在 NioServerSocketChannel 的空构造方法往下追踪源码,会发现传递了 SelectionKey.OP_ACCEPT 参数,并且赋予给 readInterestOp 属性,作用是标识该 Channel 感兴趣的事件。
- 继续追踪下去会在 AbstractChannel(Channel parent) 中的 newChannelPipeline() -> DefaultChannelPipeline(Channel channel) 创建了 ChannelPipeline。里面的 head、tail 实质是 ChannelHandlerContext 类型的双向链表。
(1-2)init(channel) 源码:
PS: init(channel) 方法在 AbstractBootstrap 中是抽象方法,在 ServerBootstrap 中进行了实现。注意这里添加了 ServerBootstrapAcceptor ,而且这是一个 ChannelInboundHandler。
/** * ServerBootstrap.init()方法,它在 channel = channelFactory.newChannel() 之后被执行,用于初始化这个 channel */ @Override void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); // 通过.option()设置的 TCP 参数就在这里应用 //static void setChannelOptions( // Channel channel, Map<ChannelOption<?>, Object> options, InternalLogger logger) { // for (Map.Entry<ChannelOption<?>, Object> e: options.entrySet()) { // setChannelOption(channel, e.getKey(), e.getValue(), logger); // } //} } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { // 通过.attr()设置的附加属性就在这里应用 for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { // 获取 NioServerSocketChannel 实例的 pipeline final ChannelPipeline pipeline = ch.pipeline(); // 这里的 config.handler()就是前面通过 .handler(ChannelHandler handler) 设置的 handler ChannelHandler handler = config.handler(); if (handler != null) { // 将这个 handler 添加到 NioServerSocketChannel 实例的 pipeline 中 pipeline.addLast(handler); } // 异步执行向 pipeline 添加 ServerBootstrapAcceptor 的步骤 ch.eventLoop().execute(new Runnable() { @Override public void run() { // ServerBootstrapAcceptor 是一个 ChannelInboundHandler pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
(1-3)register(channel) 源码追踪:
在上图的最后一个红色框圈住的代码处,NioServerSocketChannel 的实例被注册到 bossGroup 中 EventLoop 中的 Selector 上(ops: 0 在这里猜测是ready的意思,因为后面会在 AbstractNioChannel.doBeginRead() 方法中真正设置key感兴趣的ops)
doBeginRead() 方法会在channel首次注册激活或者每次readComplete之后发生(如果开启了isAutoRead,默认是开启的)。需要注意的是,即使读事件发生的时候,readyOps是0,同样可以进行read。
(2)doBind0(regFuture, channel, localAddress, promise) 源码追踪:
- 在 NioServerSocketChannel 中的 javaChannel().bind(localAddress, config.getBacklog()) 调用底层 JDK 接口完成端口绑定和监听。
- 追踪下去会发现最终调用了一个 Native 方法把.bind(PORT)最终托管给了 JVM,然后 JVM 进行系统调用。
1.3.2、run 死循环
在调用register0、doBind0等方法的时候,会委托给 EventLoop 去执行,如果是当前 EventLoop,直接执行 register0 方法,否则会交给 EventLoop.execute(Runnable task)(一般情况下都是会这样异步执行)。
// SingleThreadEventExecutor.execute(Runnable task): @Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); // 添加任务到队列 addTask(task); if (!inEventLoop) { // 如果当前线程不属于该 EventLoop,EventLoop 需要启动新线程。最终会执行 SingleThreadEventExecutor.this.run() 方法,进入到了 NioEventLoop 中 run 方法的死循环里。 startThread(); if (isShutdown()) { boolean reject = false; try { if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { // The task queue does not support removal so the best thing we can do is to just move on and // hope we will be able to pick-up the task before its completely terminated. // In worst case we will log on termination. } if (reject) { reject(); } } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
异步添加任务到队列之后,会在 run 循环里面,通过 runAllTasks() 方法执行队列里面的任务。即register0、doBind0等方法会在这时候被处理。
NioEventLoop 中的死循环,不断执行以下三个过程:
- select:轮训注册在其中的 Selector 上的 Channel 的 IO 事件。
- processSelectedKeys:在对应的 Channel 上处理 IO 事件。
- runAllTasks:再去以此循环处理任务队列中的其他任务。
// NioEventLoop.run(): @Override protected void run() { for (;;) { try { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up // overhead. (Selector.wakeup() is an expensive operation.) // // However, there is a race condition in this approach. // The race condition is triggered when 'wakenUp' is set to // true too early. // // 'wakenUp' is set to true too early if: // 1) Selector is waken up between 'wakenUp.set(false)' and // 'selector.select(...)'. (BAD) // 2) Selector is waken up between 'selector.select(...)' and // 'if (wakenUp.get()) { ... }'. (OK) // // In the first case, 'wakenUp' is set to true and the // following 'selector.select(...)' will wake up immediately. // Until 'wakenUp' is set to false again in the next round, // 'wakenUp.compareAndSet(false, true)' will fail, and therefore // any attempt to wake up the Selector will fail, too, causing // the following 'selector.select(...)' call to block // unnecessarily. // // To fix this problem, we wake up the selector again if wakenUp // is true immediately after selector.select(...). // It is inefficient in that it wakes up the selector for both // the first case (BAD - wake-up required) and the second case // (OK - no wake-up required). if (wakenUp.get()) { selector.wakeup(); } // fall through default: } } catch (IOException e) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 rebuildSelector0(); handleLoopException(e); continue; } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
1.3.3、总结
- (1)首先调用 AbstractBootstrap 中的 initAndRegister() 方法完成 NioServerSocketChannel 实例的初始化和注册。
- (2)然后调用 NioServerSocketChannel 实例的 doBind() 方法,最终调用 sun.nio.ch.Net 中的 bind()和 listen() 完成端口绑定和客户端连接监听。
- (3)在真正 register0(注册)和 bind0 (绑定)之前,会委托当前 eventLoop 的 executor 去执行,实质上是在死循环run方法中通过 runAllTasks() 方法执行 eventLoop 的队列里面的任务。
最后
以上就是安静长颈鹿为你收集整理的Netty服务器启动源码剖析Netty服务器启动源码剖析的全部内容,希望文章能够帮你解决Netty服务器启动源码剖析Netty服务器启动源码剖析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复