我是靠谱客的博主 感性水池,最近开发中收集的这篇文章主要介绍【Netty源码】服务端源码剖析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

引入

  • 如果你对Netty的服务端启动流程不是很了解,请参考基于Netty的Server代码,以便对服务器的启动流程有个清晰的认识,才能更好的理解源码中的步骤。

  • Netty是基于Nio实现的,也有selector、serverSocketChannel、socketChannel和selectKey等,只不过Netty把这些实现都封装在了底层。

  • 开始时,ServerBootstrap实例中需要两个NioEventLoopGroup实例: 负责请求的accept操作的boss和负责请求的read、write和处理操作的work,

图解

1.NioEventLoopGroup

这里写图片描述

分析:

  • NioEventLoopGroup主要负责管理eventLoop的生命周期,EventLoop数量默认为处理器个数的两倍。

  • NioEventLoop在构造方法中做了很多工作。它的父类SingleThreadEventExecutor会调用刚才NioEventLoopGroup中的线程工厂创建一个线程,并调用NioEventLoop覆写的run()方法。而run()方法中就是最为关键的事件循环代码,它对NioEventLoop构造方法创建的Selector不断的select()出就绪的事件

2.继承关系

这里写图片描述

3.NioEventLoop

这里写图片描述

分析:每个eventLoop会维护一个selector和taskQueue,负责处理客户端请求和内部任务,如ServerSocketChannel注册和ServerSocket绑定等。

源码分析

1.构造方法

  • NioEventLoopGroup的构造方法
public NioEventLoopGroup() {  
    this(0);  
}  

public NioEventLoopGroup(int nThreads) {  
    this(nThreads, null); 
}  

public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {  
    this(nThreads, threadFactory, SelectorProvider.provider());  
}  

public NioEventLoopGroup(  
            int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {  
    super(nThreads, threadFactory, selectorProvider);  
}
  • MultithreadEventLoopGroup的构造方法
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {  
    super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);  
}

分析: DEFAULT_EVENT_LOOP_THREADS 为处理器数量的两倍。

  • MultithreadEventExecutorGroup的构造方法
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (threadFactory == null) {
        threadFactory = newDefaultThreadFactory();
    }

    children = new SingleThreadEventExecutor[nThreads];
    if (isPowerOfTwo(children.length)) {
        chooser = new PowerOfTwoEventExecutorChooser();
    } else {
        chooser = new GenericEventExecutorChooser();
    }

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(threadFactory, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }
}

 protected EventExecutor newChild(  
            ThreadFactory threadFactory, Object... args) throws Exception {  
      return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);  
}

分析:MultithreadEventExecutorGroup管理EventLoop的生命周期,其中children表示EventExecutor数组,保存eventLoop;chooser表示从children中选取一个eventLoop的策略。

  • NioEventLoop的构造方法
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {  
      super(parent, threadFactory, false);  
      if (selectorProvider == null) {  
          throw new NullPointerException("selectorProvider");  
      }  
      provider = selectorProvider;  
      selector = openSelector();  
}
  • SingleThreadEventLoop 的构造方法
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    super(parent, threadFactory, addTaskWakesUp);
}
  • SingleThreadEventExecutor的构造方法
protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    if (threadFactory == null) {
        throw new NullPointerException("threadFactory");
    }
    this.parent = parent;
    this.addTaskWakesUp = addTaskWakesUp;

    thread = threadFactory.newThread(new Runnable() {
        @Override
        public void run() {
            boolean success = false;
            updateLastExecutionTime();
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                for (;;) {
                    int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                        break;
                    }
                }
                // Check if confirmShutdown() was called at the end of the loop.
                if (success && gracefulShutdownStartTime == 0) {
                    logger.error(
                            "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                            SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                            "before run() implementation terminates.");
                }

                try {
                    // Run all remaining tasks and shutdown hooks.
                    for (;;) {
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } finally {
                    try {
                        cleanup();
                    } finally {
                        STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                        threadLock.release();
                        if (!taskQueue.isEmpty()) {
                            logger.warn(
                                    "An event executor terminated with " +
                                    "non-empty task queue (" + taskQueue.size() + ')');
                        }

                        terminationFuture.setSuccess(null);
                    }
                }
            }
        }
    });
    threadProperties = new DefaultThreadProperties(thread);
    taskQueue = newTaskQueue();
}

分析:该构造函数主要的功能是初始化一个线程,并在线程内部执行NioEventLoop类的run方法,当然这个线程不会立刻执行。然后使用LinkedBlockingQueue类初始化taskQueue。

2.ServerBootstrap

(1)启动过程图解

这里写图片描述

分析:

  • 创建Channel:创建NioServerSocketChannel以及底层NIO的Channel。

  • 初始化Channel:初始化Channel和ChannelPipeline。

  • 注册事件:绑定一个EventLoop到Channel上,并将Channel和关注的SelectionKey注册到Selector上。

  • 绑定端口:绑定到某个监听端口上。

(2)doBind方法源码如下

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.executor = channel.eventLoop();
                }
                doBind0(regFuture, channel, localAddress, promise);
            }
        });
        return promise;
    }
}

分析:

  • initAndRegister方法返回一个ChannelFuture实例regFuture,通过regFuture可以判断initAndRegister执行结果。

  • 如果regFuture.isDone()为true,说明initAndRegister已经执行完,则直接执行doBind0进行socket绑定。

  • 否则regFuture添加一个ChannelFutureListener监听,当initAndRegister执行完成时,调用operationComplete方法并执行doBind0进行socket绑定。

  • 所以只有当initAndRegister操作结束之后才能进行bind操作。

(3)initAndRegister方法源码如下

final ChannelFuture initAndRegister() {
    final Channel channel = channelFactory().newChannel();
    try {
        init(channel);
    } catch (Throwable t) {
        channel.unsafe().closeForcibly();
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    ChannelFuture regFuture = group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

分析:该函数主要负责创建服务端channel;为NioServerSocketChannel的pipeline添加handler;注册NioServerSocketChannel到selector。

(4)init方法源码如下

void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options();
    synchronized (options) {
        channel.config().setOptions(options);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            pipeline.addLast(new ServerBootstrapAcceptor(
                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }
    });
}

分析:该方法主要的功能是设置channel的options和attrs,然后在pipeline中添加一个ChannelInitializer对象

3.NioServerSocketChannel

(1)继承关系图解

这里写图片描述

分析:其实是对Nio的ServerSocketChannel和SelectionKey进行了封装。

(2)newSocket与NioServerSocketChannel方法源码如下

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
                "Failed to open a server socket.", e);
    }
}
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

分析:newSocket方法利用 provider.openServerSocketChannel() 生成Nio中的ServerSocketChannel对象。然后NioServerSocketChannel方法设置SelectionKey.OP_ACCEPT事件。即通知 selector 我们对客户端的连接请求感兴趣

4.register0与bind()过程

  • init执行完,需要把当前channel注册到EventLoopGroup,其实最终目的是为了实现Nio中把ServerSocket注册到selector上,这样就可以实现client请求的监听了。

  • EventLoopGroup中维护了多个eventLoop,next方法会调用chooser策略找到下一个eventLoop,并执行eventLoop的register方法进行注册。

  • NioServerSocketChannel初始化时,会创建一个NioMessageUnsafe实例,用于实现底层的register、read、write等操作。

  • register0方法提交到eventLoop线程池中执行,这个时候会启动eventLoop中的线程,而doRegister()才是最终Nio中的注册方法,方法javaChannel()获取ServerSocketChannel。

  • ServerSocketChannel注册完之后,通知pipeline执行fireChannelRegistered方法,pipeline中维护了handler链表,通过遍历链表,执行InBound类型handler的channelRegistered方法,最终执行init中添加的ChannelInitializer handler。

  • initChannel方法最终把ServerBootstrapAcceptor添加到ServerSocketChannel的pipeline,负责accept客户端请求。在pipeline中删除对应的handler。触发fireChannelRegistered方法,可以自定义handler的channelRegistered方法。

  • 到现在,ServerSocketChannel完成了初始化并注册到seletor上,启动线程执行selector.select()方法准备接受客户端请求。但ServerSocketChannel的socket还未绑定到指定端口,Netty把注册操作放到eventLoop中执行。最终由unsafe实现端口的bind操作。

  • bind完成后,且ServerSocketChannel也已经注册完成,则触发pipeline的fireChannelActive方法,所以在这里可以自定义fireChannelActive方法,默认执行tail的fireChannelActive。

  • channel.read()方法会触发pipeline的行为:最终会在pipeline中找到handler执行read方法,默认是head。

  • 大功告成!server已经启动完成。

小结

主要流程:ServerBootstrap和EventLoopGroup互相配合,真正的核心是它们创建出NioEventLoop组和NioServerSocketChannel。每个NioEventLoop对应一个线程和一个Selector,NioServerSocketChannel会主动注册到某一个NioEventLoop的Selector上,NioEventLoop负责事件轮询。



本人才疏学浅,若有错,请指出,谢谢!
如果你有更好的建议,可以留言我们一起讨论,共同进步!
衷心的感谢您能耐心的读完本篇博文!

参考链接:
1. Netty 4源码解析:服务端启动
2.揭开 Bootstrap 神秘的红盖头 (服务器端)

最后

以上就是感性水池为你收集整理的【Netty源码】服务端源码剖析的全部内容,希望文章能够帮你解决【Netty源码】服务端源码剖析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(56)

评论列表共有 0 条评论

立即
投稿
返回
顶部