概述
引入
如果你对Netty的服务端启动流程不是很了解,请参考基于Netty的Server代码,以便对服务器的启动流程有个清晰的认识,才能更好的理解源码中的步骤。
Netty是基于Nio实现的,也有selector、serverSocketChannel、socketChannel和selectKey等,只不过Netty把这些实现都封装在了底层。
开始时,ServerBootstrap实例中需要两个NioEventLoopGroup实例: 负责请求的accept操作的boss和负责请求的read、write和处理操作的work,
图解
1.NioEventLoopGroup
分析:
NioEventLoopGroup主要负责管理eventLoop的生命周期,EventLoop数量默认为处理器个数的两倍。
NioEventLoop在构造方法中做了很多工作。它的父类SingleThreadEventExecutor会调用刚才NioEventLoopGroup中的线程工厂创建一个线程,并调用NioEventLoop覆写的run()方法。而run()方法中就是最为关键的事件循环代码,它对NioEventLoop构造方法创建的Selector不断的select()出就绪的事件
2.继承关系
3.NioEventLoop
分析:每个eventLoop会维护一个selector和taskQueue,负责处理客户端请求和内部任务,如ServerSocketChannel注册和ServerSocket绑定等。
源码分析
1.构造方法
- NioEventLoopGroup的构造方法
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, null);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
super(nThreads, threadFactory, selectorProvider);
}
- MultithreadEventLoopGroup的构造方法
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
分析: DEFAULT_EVENT_LOOP_THREADS 为处理器数量的两倍。
- MultithreadEventExecutorGroup的构造方法
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
}
children = new SingleThreadEventExecutor[nThreads];
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(threadFactory, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
}
protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
分析:MultithreadEventExecutorGroup管理EventLoop的生命周期,其中children表示EventExecutor数组,保存eventLoop;chooser表示从children中选取一个eventLoop的策略。
- NioEventLoop的构造方法
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}
- SingleThreadEventLoop 的构造方法
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
super(parent, threadFactory, addTaskWakesUp);
}
- SingleThreadEventExecutor的构造方法
protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.parent = parent;
this.addTaskWakesUp = addTaskWakesUp;
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error(
"Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
threadProperties = new DefaultThreadProperties(thread);
taskQueue = newTaskQueue();
}
分析:该构造函数主要的功能是初始化一个线程,并在线程内部执行NioEventLoop类的run方法,当然这个线程不会立刻执行。然后使用LinkedBlockingQueue类初始化taskQueue。
2.ServerBootstrap
(1)启动过程图解
分析:
创建Channel:创建NioServerSocketChannel以及底层NIO的Channel。
初始化Channel:初始化Channel和ChannelPipeline。
注册事件:绑定一个EventLoop到Channel上,并将Channel和关注的SelectionKey注册到Selector上。
绑定端口:绑定到某个监听端口上。
(2)doBind方法源码如下
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.executor = channel.eventLoop();
}
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}
分析:
initAndRegister方法返回一个ChannelFuture实例regFuture,通过regFuture可以判断initAndRegister执行结果。
如果regFuture.isDone()为true,说明initAndRegister已经执行完,则直接执行doBind0进行socket绑定。
否则regFuture添加一个ChannelFutureListener监听,当initAndRegister执行完成时,调用operationComplete方法并执行doBind0进行socket绑定。
所以只有当initAndRegister操作结束之后才能进行bind操作。
(3)initAndRegister方法源码如下
final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
try {
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
分析:该函数主要负责创建服务端channel;为NioServerSocketChannel的pipeline添加handler;注册NioServerSocketChannel到selector。
(4)init方法源码如下
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = handler();
if (handler != null) {
pipeline.addLast(handler);
}
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
分析:该方法主要的功能是设置channel的options和attrs,然后在pipeline中添加一个ChannelInitializer对象
3.NioServerSocketChannel
(1)继承关系图解
分析:其实是对Nio的ServerSocketChannel和SelectionKey进行了封装。
(2)newSocket与NioServerSocketChannel方法源码如下
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
分析:newSocket方法利用 provider.openServerSocketChannel() 生成Nio中的ServerSocketChannel对象。然后NioServerSocketChannel方法设置SelectionKey.OP_ACCEPT事件。即通知 selector 我们对客户端的连接请求感兴趣
4.register0与bind()过程
init执行完,需要把当前channel注册到EventLoopGroup,其实最终目的是为了实现Nio中把ServerSocket注册到selector上,这样就可以实现client请求的监听了。
EventLoopGroup中维护了多个eventLoop,next方法会调用chooser策略找到下一个eventLoop,并执行eventLoop的register方法进行注册。
NioServerSocketChannel初始化时,会创建一个NioMessageUnsafe实例,用于实现底层的register、read、write等操作。
register0方法提交到eventLoop线程池中执行,这个时候会启动eventLoop中的线程,而doRegister()才是最终Nio中的注册方法,方法javaChannel()获取ServerSocketChannel。
ServerSocketChannel注册完之后,通知pipeline执行fireChannelRegistered方法,pipeline中维护了handler链表,通过遍历链表,执行InBound类型handler的channelRegistered方法,最终执行init中添加的ChannelInitializer handler。
initChannel方法最终把ServerBootstrapAcceptor添加到ServerSocketChannel的pipeline,负责accept客户端请求。在pipeline中删除对应的handler。触发fireChannelRegistered方法,可以自定义handler的channelRegistered方法。
到现在,ServerSocketChannel完成了初始化并注册到seletor上,启动线程执行selector.select()方法准备接受客户端请求。但ServerSocketChannel的socket还未绑定到指定端口,Netty把注册操作放到eventLoop中执行。最终由unsafe实现端口的bind操作。
bind完成后,且ServerSocketChannel也已经注册完成,则触发pipeline的fireChannelActive方法,所以在这里可以自定义fireChannelActive方法,默认执行tail的fireChannelActive。
channel.read()方法会触发pipeline的行为:最终会在pipeline中找到handler执行read方法,默认是head。
大功告成!server已经启动完成。
小结
主要流程:ServerBootstrap和EventLoopGroup互相配合,真正的核心是它们创建出NioEventLoop组和NioServerSocketChannel。每个NioEventLoop对应一个线程和一个Selector,NioServerSocketChannel会主动注册到某一个NioEventLoop的Selector上,NioEventLoop负责事件轮询。
本人才疏学浅,若有错,请指出,谢谢!
如果你有更好的建议,可以留言我们一起讨论,共同进步!
衷心的感谢您能耐心的读完本篇博文!
参考链接:
1. Netty 4源码解析:服务端启动
2.揭开 Bootstrap 神秘的红盖头 (服务器端)
最后
以上就是感性水池为你收集整理的【Netty源码】服务端源码剖析的全部内容,希望文章能够帮你解决【Netty源码】服务端源码剖析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复