我是靠谱客的博主 唠叨西装,最近开发中收集的这篇文章主要介绍Netty源码_01_Netty服务端初始化一、前言二、服务端启动三、Netty服务端初始化,控制台如何输出handlerAdded、channelRegistered、channelActive四、面试金手指五、尾声,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

文章目录

  • 一、前言
  • 二、服务端启动
    • 2.1 服务端启动
    • 2.2 服务端启动的每一行代码
  • 三、Netty服务端初始化,控制台如何输出handlerAdded、channelRegistered、channelActive
    • 3.1 bind()两个子方法:initAndRegister() + doBind0()
    • 3.2 初始化和注册:initAndRegister()
      • 3.2.1 initAndRegister()三个子方法:newChannel() + init(channel)+register(channel)
      • 3.2.2 initAndRegister()第一步:newChannel()工厂模式新建一个channel
        • 3.2.2.1 使用new新建一个ReflectiveChannelFactory(channelClass作为实参)
        • 3.2.2.2 反射方式新建一个NioServerSocketChannel对象
        • 3.2.2.3 反射调用进一步深入:NioServerSocketChannel无参构造函数
      • 3.2.3 initAndRegister()第二步:init(channel)方法初始化这个新建的channel
        • 3.2.3.1 init(channel)第一步:设置option和attr
        • 3.2.3.2 init(channel)第二步:设置新接入channel的option和attr
        • 3.2.3.3 init(channel)第三步:加入新连接处理器
      • 3.2.4 initAndRegister()第三步,register(channel)方法将这个新建并初始化好的channel注册到某个对象(这里就是打印控制台三句的地方了)
        • 3.2.4.1 register(channel)方法第一步:register0()执行之前
        • 3.2.4.2 register(channel)方法第二步:register0()执行,控制台打印handlerAdded、channelRegistered
        • 3.2.4.3 register(channel)方法第三步:register0()执行,控制台不打印第三句
    • 3.3 doBind0()执行,控制台打印第三句
      • 3.3.1 doBind0():控制台打印第三句
      • 3.3.2 doBind0()调用的bind()方法
      • 3.3.3 NioMessageUnsafe.bind()方法,真正控制台打印第三句的地方
        • 3.3.3.1 NioMessageUnsafe.bind()方法
        • 3.3.3.2 从Netty的doBind()方法到JavaSE的bind()方法
  • 四、面试金手指
    • 4.1 Netty初始化流程总述
    • 4.2 对于服务端demo中每一行的解释
    • 4.3 initAndRegister()
    • 4.4 doBind0()
  • 五、尾声

一、前言

二、服务端启动

2.1 服务端启动

下面是一个非常简单的服务端启动代码

public final class SimpleServer {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new SimpleServerHandler())
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                        }
                    });

            ChannelFuture f = b.bind(8888).sync();

            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private static class SimpleServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelActive");
        }

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelRegistered");
        }

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            System.out.println("handlerAdded");
        }
    }
}

2.2 服务端启动的每一行代码

简单的几行代码就能开启一个服务端,端口绑定在8888,使用nio模式,下面讲下每一个步骤的处理细节

EventLoopGroup 一个死循环,不停地检测IO事件,处理IO事件,执行任务;

ServerBootstrap 是服务端的一个启动辅助类,通过给他设置一系列参数来绑定端口启动服务;

group(bossGroup, workerGroup) 我们需要两种类型的人干活,一个是老板,一个是工人,老板负责从外面接活,接到的活分配给工人干,放到这里,bossGroup的作用就是不断地accept到新的连接,将新的连接丢给workerGroup来处理;

.channel(NioServerSocketChannel.class) 表示服务端启动的是nio相关的channel,channel在netty里面是一大核心概念,可以理解为一条channel就是一个连接或者一个服务端bind动作,后面会细说;

.handler(new SimpleServerHandler() 表示服务器启动过程中,需要经过哪些流程,这里SimpleServerHandler最终的顶层接口为ChannelHander,是netty的一大核心概念,表示数据流经过的处理器,可以理解为流水线上的每一道关卡;

childHandler(new ChannelInitializer<SocketChannel>); 表示一条新的连接进来之后,该怎么处理,也就是上面所说的,老板如何给工人配活;

ChannelFuture f = b.bind(8888).sync(); 这里就是真正的启动过程了,绑定8888端口,等待服务器启动完毕,才会进入下行代码;

f.channel().closeFuture().sync(); 等待服务端关闭socket;

bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); 关闭两组死循环;

上述代码可以很轻松地再本地跑起来,最终控制台的输出为:

handlerAdded
channelRegistered
channelActive

关于为什么会顺序输出这些,这是本文的核心问题,也是Netty服务端初始化的核心问题,且看第三部分。

三、Netty服务端初始化,控制台如何输出handlerAdded、channelRegistered、channelActive

3.1 bind()两个子方法:initAndRegister() + doBind0()

ServerBootstrap 一系列的参数配置其实没啥好讲的,无非就是使用method chaining的方式将启动服务器需要的参数保存到filed。我们的重点落入到下面这段代码

b.bind(8888).sync();

这里说一句:我们刚开始看源码,来确定哪行代码是最终启动服务的入口,在这里,我们已经确定了bind方法是入口,我们跟进去,分析

public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
} 

通过端口号创建一个 InetSocketAddress,然后继续bind

public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);
}

validate() 验证服务启动需要的必要参数,然后调用doBind()

private ChannelFuture doBind(final SocketAddress localAddress) {
    //...
    final ChannelFuture regFuture = initAndRegister();
    //...
    final Channel channel = regFuture.channel();
    //...
    doBind0(regFuture, channel, localAddress, promise);
    //...
    return promise;
}

这里,我去掉了细枝末节,让我们专注于核心方法,其实就两大核心一个是 initAndRegister(),以及doBind0()

其实,从方法名上面我们已经可以略窥一二,init->初始化,register->注册,那么到底要注册到什么呢?联系到nio里面轮询器的注册,可能是把某个东西初始化好了之后注册到selector上面去,最后bind,像是在本地绑定端口号,带着这些猜测,我们深入下去。

问题:doBind0()执行之前,需要初始化和注册,那么到底要注册到什么呢?

回答:联系到nio里面轮询器的注册,可能是把某个东西初始化好了之后注册到selector上面去。

3.2 初始化和注册:initAndRegister()

3.2.1 initAndRegister()三个子方法:newChannel() + init(channel)+register(channel)

final ChannelFuture initAndRegister() {
    Channel channel = null;
    // ...
    channel = channelFactory.newChannel();
    //...
    init(channel);
    //...
    ChannelFuture regFuture = config().group().register(channel);
    //...
    return regFuture;
}

我们还是专注于核心代码,抛开边角料,我们看到 initAndRegister() 做了几件事情:

1、工程模式新建一个channel;

2、init这个channel;

3、将这个channel register到某个对象;

我们逐步分析这三件事情。

3.2.2 initAndRegister()第一步:newChannel()工厂模式新建一个channel

3.2.2.1 使用new新建一个ReflectiveChannelFactory(channelClass作为实参)

我们首先要搞懂channel的定义,netty官方对channel的描述如下

A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind

这里的channel,由于是在服务启动的时候创建,我们可以和普通Socket编程中的ServerSocket对应上,表示服务端绑定的时候经过的一条流水线

我们发现这条channel是通过一个 channelFactory new出来的,channelFactory 的接口很简单

public interface ChannelFactory<T extends Channel> extends io.netty.bootstrap.ChannelFactory<T> {
    /**
     * Creates a new channel.
     */
    @Override
    T newChannel();
}

就一个方法,我们查看channelFactory被赋值的地方

AbstractBootstrap类
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
    if (channelFactory == null) {
        throw new NullPointerException("channelFactory");
    }
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    }

    this.channelFactory = channelFactory;
    return (B) this;   // 返回AbstractBootstrap类对象
}

在这里被赋值,我们层层回溯,查看设置channel工厂的channelFactory()函数被调用的地方,发现最终是在这个函数中,ChannelFactory被new出

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));  //以channelClass为实参,构造一个ReflectiveChannelFactory,然后调用channelFactory()方法,将这个ReflectiveChannelFactory对象赋值给类变量,AbstractBootstrap类中的this.channelFactory,完成工厂对象的初始化
}

金手指:我们的demo程序调用channel(channelClass)方法的时候,将channelClass作为ReflectiveChannelFactory构造函数的实参创建出一个ReflectiveChannelFactory。

问题:在channel()方法中,以channelClass为实参,构造一个ReflectiveChannelFactory,然后调用channelFactory()方法,将这个ReflectiveChannelFactory对象赋值给类变量,AbstractBootstrap类中的this.channelFactory,完成工厂对象的初始化,那么,这个channel()方法是在哪里被调用的,调用的时候,实参channelClass是什么?

回答:在demo端代码中被调用,实参是NioServerSocketChannel.class,证据是: demo端的代码 .channel(NioServerSocketChannel.class);

3.2.2.2 反射方式新建一个NioServerSocketChannel对象

过渡问题:完成了channelFactory的新建(使用NioServerSocketChannel为实参,创建一个ReflectiveChannelFactory工厂对象,使用new新建对象),现在在channelFactory中创建channel?
回答: channelFactory.newChannel();

我们就可以推断出,最终是调用到 ReflectiveChannelFactory.newChannel() 方法,跟进

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }

    @Override
    public T newChannel() {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }
}

看到clazz.newInstance();,我们明白了,原来是通过反射的方式来创建一个对象,而这个class就是我们在ServerBootstrap中传入的NioServerSocketChannel.class

结果,绕了一圈,最终创建channel相当于调用默认构造函数new出一个 NioServerSocketChannel对象

金手指:ReflectiveChannelFactory.newChannel() 方法 是通过反射的方式来创建一个对象,而这个class就是我们在ServerBootstrap中传入的NioServerSocketChannel.class。

3.2.2.3 反射调用进一步深入:NioServerSocketChannel无参构造函数

过渡问题:
ReflectiveChannelFactory类中的newChannel()方法,使用反射新建NioServerSocketChannel对象,调动的是NioServerSocketChannel的无参构造函数,我们看看NioServerSocketChannel的无参构造函数。

接下来我们就可以将重心放到 NioServerSocketChannel的默认构造函数

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();   // 实参  DEFAULT_SELECTOR_PROVIDER 
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));    // 实参 DEFAULT_SELECTOR_PROVIDER 
}

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    //...
    return provider.openServerSocketChannel();
}

通过SelectorProvider.openServerSocketChannel()创建一条server端channel,然后进入到以下方法,NioServerSocketChannel带参构造函数

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

这里第一行代码就跑到父类里面去了,第二行,new出来一个 NioServerSocketChannelConfig,其顶层接口为 ChannelConfig,netty官方的描述如下

A set of configuration properties of a Channel.

基本可以判定,ChannelConfig 也是netty里面的一大核心模块,初次看源码,看到这里,我们大可不必深挖这个对象,而是在用到的时候再回来深究,只要记住,这个对象在创建NioServerSocketChannel对象的时候被创建即可

我们继续追踪到 NioServerSocketChannel 的父类

AbstractNioMessageChannel类
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
}

继续往上追

AbstractNioChannel类
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;   // 简单地将前面 provider.openServerSocketChannel(); 创建出来的 ServerSocketChannel 保存到成员变量
    this.readInterestOp = readInterestOp;
    //...
    ch.configureBlocking(false);   // 调用ch.configureBlocking(false);
    //... 
}

这里,简单地将前面 provider.openServerSocketChannel(); 创建出来的 ServerSocketChannel 保存到成员变量,然后调用ch.configureBlocking(false);设置该channel为非阻塞模式,标准的jdk nio编程的玩法

这里的 readInterestOp 即前面层层传入的 SelectionKey.OP_ACCEPT,接下来重点分析 super(parent);(这里的parent其实是null,由前面写死传入)

AbstractChannel类
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

到了这里,又new出来三大组件,赋值到成员变量,分别为

id = newId();
protected ChannelId newId() {
    return DefaultChannelId.newInstance();
}

id是netty中每条channel的唯一标识,这里不细展开,接着

unsafe = newUnsafe();
protected abstract AbstractUnsafe newUnsafe();

查看Unsafe的定义

Unsafe operations that should never be called from user-code. These methods are only provided to implement the actual transport, and must be invoked from an I/O thread

成功捕捉netty的又一大组件,我们可以先不用管TA是干嘛的,只需要知道这里的 newUnsafe方法最终属于类NioServerSocketChannel中

最后

pipeline = newChannelPipeline();

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
}

pipeline一看就知道是一个非循环双链表,新建两个指针head.next = tail; tail.prev = head;

初次看这段代码,可能并不知道 DefaultChannelPipeline 是干嘛用的,我们仍然使用上面的方式,查看顶层接口ChannelPipeline的定义

A list of ChannelHandlers which handles or intercepts inbound events and outbound operations of a Channel

从该类的文档中可以看出,该接口基本上又是netty的一大核心模块

到了这里,我们总算把一个服务端channel创建完毕了,将这些细节串起来的时候,我们顺带提取出netty的几大基本组件,先总结如下

Channel
ChannelConfig
ChannelId
Unsafe
Pipeline
ChannelHander

初次看代码的时候,我们的目标是跟到服务器启动的那一行代码,我们先把以上这几个组件记下来,等代码跟完,我们就可以自顶向下,逐层分析,我会放到后面源码系列中去深入到每个组件

总结一下,用户调用方法 Bootstrap.bind(port) 第一步就是通过反射的方式new一个NioServerSocketChannel对象,并且在new的过程中创建了一系列的核心组件,仅此而已,并无他,真正的启动我们还需要继续跟

3.2.3 initAndRegister()第二步:init(channel)方法初始化这个新建的channel

到了这里,你最好跳到文章最开始的地方回忆一下,第一步newChannel完毕,这里就对这个channel做init,init方法具体干啥,我们深入

@Override
void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        channel.config().setOptions(options);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

初次看到这个方法,可能会觉得,哇塞,老长了,这可这么看?还记得我们前面所说的吗,庖丁解牛,逐步拆解,最后归一,下面是我的拆解步骤

3.2.3.1 init(channel)第一步:设置option和attr

    // 通过options0()得到属性,设置到channel上去
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        channel.config().setOptions(options);
    }
    
    // 通过attrs0()得到属性,设置到channel上去
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {   // 保证整个操作原子性的情况下操作
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();  // 得到每一个entry中的key
            channel.attr(key).set(e.getValue());  // 对于每一个entry,通过key设置value
        }
    }

通过这里我们可以看到,这里先调用options0()以及attrs0(),然后将得到的options和attrs注入到channelConfig或者channel中,关于option和attr是干嘛用的,其实你现在不用了解得那么深入,只需要查看最顶层接口ChannelOption以及查看一下channel的具体继承关系,就可以了解,我把这两个也放到后面的源码分析系列再讲

3.2.3.2 init(channel)第二步:设置新接入channel的option和attr

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
    currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
    currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}

这里,和上面类似,只不过不是设置当前channel的这两个属性,而是设置新进来连接对应的channel,由于我们这篇文章只关心到server如何启动,接入连接放到下一篇文章中详细剖析。

3.2.3.3 init(channel)第三步:加入新连接处理器

p.addLast(new ChannelInitializer<Channel>() {   // ChannelPipeline是一个非循环双向链表,最后面插入一个元素
        @Override
        public void initChannel(Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();  // pipeline
            ChannelHandler handler = config.handler();  //handler
            if (handler != null) { 
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
//
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });

到了最后一步,含义:p.addLast()向serverChannel的流水线处理器中加入了一个 ServerBootstrapAcceptor,从名字上就可以看出来,这是一个接入器Acceptor,专门接受新请求(accept请求),把新的请求扔给某个事件循环器,我们先不做过多分析。

来,我们总结一下,我们发现其实init也没有启动服务,只是初始化了一些基本的配置和属性,以及在pipeline上加入了一个接入器,用来专门接受新连接,我们还得继续往下跟。

3.2.4 initAndRegister()第三步,register(channel)方法将这个新建并初始化好的channel注册到某个对象(这里就是打印控制台三句的地方了)

3.2.4.1 register(channel)方法第一步:register0()执行之前

这一步,我们是分析如下方法

ChannelFuture regFuture = config().group().register(channel);
调用到 NioEventLoop 中的register

@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

好了,到了这一步,还记得这里的unsafe()返回的应该是什么对象吗?不记得的话可以看下前面关于unsafe的描述,或者最快的方式就是debug到这边,跟到register方法里面,看看是哪种类型的unsafe。

我们跟进去之后发现是:

AbstractUnsafe类
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // ...
    AbstractChannel.this.eventLoop = eventLoop;  // 先将EventLoop事件循环器绑定到该NioServerSocketChannel上
    // ...
    register0(promise);     //  调用 register0()


}

这里我们依然只需要focus重点,先将EventLoop事件循环器绑定到该NioServerSocketChannel上,然后调用 register0()。

3.2.4.2 register(channel)方法第二步:register0()执行,控制台打印handlerAdded、channelRegistered

private void register0(ChannelPromise promise) {
    try {
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        pipeline.invokeHandlerAddedIfNeeded();   // 控制台打印第一句 handlerAdded

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();  // 控制台打印第二句 channelRegistered
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();   // 这里判断条件不成立,不是打印第三句的地方
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

这一段其实也很清晰,先调用 doRegister();,具体干啥待会再讲,然后调用invokeHandlerAddedIfNeeded(), 于是乎,控制台第一行打印出来的就是

handlerAdded

关于最终是如何调用到的,我们后面详细剖析pipeline的时候再讲

然后调用 pipeline.fireChannelRegistered(); 调用之后,控制台的显示为

handlerAdded
channelRegistered

3.2.4.3 register(channel)方法第三步:register0()执行,控制台不打印第三句

继续往下跟

if (isActive()) {
    if (firstRegistration) {
        pipeline.fireChannelActive();
    } else if (config().isAutoRead()) {
        beginRead();
    }
}

读到这,你可能会想当然地以为,控制台最后一行

pipeline.fireChannelActive();

由这行代码输出,我们不妨先看一下 isActive() 方法

@Override
public boolean isActive() {
    return javaChannel().socket().isBound();
}

最终调用到jdk中(注意:这里从Netty的源码到JavaSE的源码了)

ServerSocket类
/**
 * Returns the binding state of the ServerSocket.
 *
 * @return true if the ServerSocket succesfuly bound to an address
 * @since 1.4
 */
public boolean isBound() {
    // Before 1.3 ServerSockets were always bound during creation
    return bound || oldImpl;
}

这里isBound()返回false,但是从目前我们跟下来的流程看,我们并没有将一个ServerSocket绑定到一个address,所以 isActive() 返回false,我们没有成功进入到pipeline.fireChannelActive();方法,那么最后一行到底是谁输出的呢?

3.3 doBind0()执行,控制台打印第三句

3.3.1 doBind0():控制台打印第三句

如果你看到方法的最近的发起端是一个线程Runnable的run方法,那么就在提交Runnable对象方法的地方打一个断点,去掉其他断点,重新debug,比如我们首次debug发现调用栈中的最近的一个Runnable如下

if (!wasActive && isActive()) {
    invokeLater(new Runnable() {
        @Override
        public void run() {
            pipeline.fireChannelActive();   //打印控制台第三句,那么是哪里调用这串代码呢?这是主要问题
        }
    });
}

我们停在了这一行pipeline.fireChannelActive();, 我们想看最初始的调用,就得跳出来,断点打到 if (!wasActive && isActive()),因为netty里面很多任务执行都是异步线程即reactor线程调用的(任务分为两种,非定时任务和定时任务,可以reactor线程调用,可以业务线程调动),如果我们要查看最先发起的方法调用,我们必须得查看Runnable被提交的地方,逐次递归下去,就能找到那行"消失的代码"

最终,通过这种方式,终于找到了 pipeline.fireChannelActive(); 的发起调用的代码,不巧,刚好就是下面的doBind0()方法

doBind0()

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
            
        channel.eventLoop().execute(new Runnable() {   // doBind0()方法中,是通过包装一个Runnable来实现异步化task的
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
        
    }

我们发现,doBind0()方法中,是通过包装一个Runnable来实现异步化task。

问题:什么是异步化task?

回答:reactor采用Java nio,reactor线程是一个异步线程,任务包括两种:非定时任务和定时任务,线程包括两种:reactor线程和业务线程。

好,接下来我们进入到channel.bind()方法。

3.3.2 doBind0()调用的bind()方法

AbstractChannel类
@Override
public ChannelFuture bind(SocketAddress localAddress) {
    return pipeline.bind(localAddress);
}

发现是调用pipeline的bind方法

@Override
public final ChannelFuture bind(SocketAddress localAddress) {
    return tail.bind(localAddress);
}

相信你对tail是什么不是很了解,可以翻到最开始,tail在创建pipeline的时候出现过,关于pipeline和tail对应的类,我后面源码系列会详细解说,这里,你要想知道接下来代码的走向,唯一一个比较好的方式就是debug 单步进入,篇幅原因,我就不详细展开

接上面,从tail.bind(localAddress);开始,来到如下:

HeadContext类
@Override
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
        throws Exception {
    unsafe.bind(localAddress, promise);
}

3.3.3 NioMessageUnsafe.bind()方法,真正控制台打印第三句的地方

3.3.3.1 NioMessageUnsafe.bind()方法

这里的unsafe就是前面提到的 AbstractUnsafe, 准确点,应该是 NioMessageUnsafe

在这里插入图片描述

我们进入到它的bind方法

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    // ...
    boolean wasActive = isActive();
    // ...
    doBind(localAddress);

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();   // 打印控制台第三句,这就是3.3.1刚开始到的那一段
            }
        });
    }
    safeSetSuccess(promise);
}

显然按照正常流程,我们前面已经分析到 isActive(); 方法返回false,进入到 doBind()之后,如果channel被激活了,就发起pipeline.fireChannelActive();调用,最终调用到用户方法,在控制台打印出了最后一行,所以到了这里,你应该清楚为什么最终会在控制台按顺序打印出那三行字了。

3.3.3.2 从Netty的doBind()方法到JavaSE的bind()方法

doBind()方法也很简单

protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        //noinspection Since15
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

最终调到了jdk里面的bind方法,这行代码过后,正常情况下,就真正进行了端口的绑定。

另外,通过自顶向下的方式分析,在调用pipeline.fireChannelActive();方法的时候,会调用到如下方法

HeadContext类
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelActive();

    readIfIsAutoRead();
}

进入 readIfIsAutoRead
private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
        channel.read();
    }
}

分析isAutoRead方法
private volatile int autoRead = 1;
public boolean isAutoRead() {
    return autoRead == 1;
}

由此可见,isAutoRead方法默认返回true,于是进入到以下方法

public Channel read() {
    pipeline.read();
    return this;
}

最终调用到
AbstractNioUnsafe类
protected void doBeginRead() throws Exception {
    final SelectionKey selectionKey = this.selectionKey;  // 这里的this.selectionKey就是我们在前面register步骤返回的对象,前面我们在register的时候,注册ops是0
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();  // 前面我们在register的时候,注册ops是0
    if ((interestOps & readInterestOp) == 0) {  
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

这里的this.selectionKey就是我们在前面register步骤返回的对象,前面我们在register的时候,注册ops是0

金手指:回忆一下注册,AbstractNioChannel类selectionKey = javaChannel().register(eventLoop().selector, 0, this)

这里相当于把注册过的ops取出来,通过了if条件,然后调用

selectionKey.interestOps(interestOps | readInterestOp);

而这里的 readInterestOp 就是前面newChannel的时候传入的SelectionKey.OP_ACCEPT,又是标准的jdk nio的玩法,到此,你需要了解的细节基本已经差不多了,就这样结束吧!

四、面试金手指

4.1 Netty初始化流程总述

netty启动一个服务所经过的流程

1、设置启动类参数,最重要的就是设置channel;

2、创建server对应的channel,创建各大组件,包括ChannelConfig, ChannelId, ChannelPipeline, ChannelHandler, Unsafe等;

3、初始化server对应的channel,设置一些attr,option,以及设置子channel的attr,option,给server的channel添加新channel接入器,并出发addHandler,register等事件;

4、调用到jdk底层做端口绑定,并触发active事件,active触发的时候,真正做服务端口绑定。

4.2 对于服务端demo中每一行的解释

EventLoopGroup 一个死循环,不停地检测IO事件,处理IO事件,执行任务;

ServerBootstrap 是服务端的一个启动辅助类,通过给他设置一系列参数来绑定端口启动服务;

group(bossGroup, workerGroup) 我们需要两种类型的人干活,一个是老板,一个是工人,老板负责从外面接活,接到的活分配给工人干,放到这里,bossGroup的作用就是不断地accept到新的连接,将新的连接丢给workerGroup来处理;

.channel(NioServerSocketChannel.class) 表示服务端启动的是nio相关的channel,channel在netty里面是一大核心概念,可以理解为一条channel就是一个连接或者一个服务端bind动作,后面会细说;

.handler(new SimpleServerHandler() 表示服务器启动过程中,需要经过哪些流程,这里SimpleServerHandler最终的顶层接口为ChannelHander,是netty的一大核心概念,表示数据流经过的处理器,可以理解为流水线上的每一道关卡;

childHandler(new ChannelInitializer<SocketChannel>); 表示一条新的连接进来之后,该怎么处理,也就是上面所说的,老板如何给工人配活;

ChannelFuture f = b.bind(8888).sync(); 这里就是真正的启动过程了,绑定8888端口,等待服务器启动完毕,才会进入下行代码;

f.channel().closeFuture().sync(); 等待服务端关闭socket;

bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); 关闭两组死循环;

4.3 initAndRegister()

initAndRegister()方法三步骤:

步骤1:工厂模式新建一个channel

(1)使用new的方式新建一个ReflectiveChannelFactory(channelClass作为实参);

(2)反射方式新建一个NioServerSocketChannel对象。

步骤2:init这个channel

init(channel)第一步,设置option和attr;

init(channel)第二步,设置新接入channel的option和attr;

init(channel)第三步,加入新连接处理器。

步骤3:这个channel register到某个对象

register(channel)方法第一步:register0()执行之前,先将EventLoop事件循环器绑定到该NioServerSocketChannel上,然后调用 register0();

register(channel)方法第二步:register0()执行,控制台打印handlerAdded、channelRegistered;

register(channel)方法第三步:register0()执行,控制台不打印第三句。

4.4 doBind0()

略。

五、尾声

完成了。

天天打码,天天进步!!

最后

以上就是唠叨西装为你收集整理的Netty源码_01_Netty服务端初始化一、前言二、服务端启动三、Netty服务端初始化,控制台如何输出handlerAdded、channelRegistered、channelActive四、面试金手指五、尾声的全部内容,希望文章能够帮你解决Netty源码_01_Netty服务端初始化一、前言二、服务端启动三、Netty服务端初始化,控制台如何输出handlerAdded、channelRegistered、channelActive四、面试金手指五、尾声所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(58)

评论列表共有 0 条评论

立即
投稿
返回
顶部