概述
在上篇文章中,我们介绍了Netty服务器启动的流程
Netty源码解析(二)之服务器启动源码_benjam1n77的博客-CSDN博客,这篇文章中,我们继续介绍客户端是如何与服务器端建立连接的。
一. 先从EventLoopGroup和EventLoop开始
EventLoopGroup和EventLoop其实就相当于线程池(或者说线程组)和线程,下面是NioEventLoopGroup和NioEventLoop的继承类图
可以看到不管是EventLoop还是EventLoopGroup最终都继承于JDK的Executor接口。这里我们不对这两个类多做赘述,主要是介绍他们在客户端连接过程中所起的作用。
EventLoopGroup中有一个成员变量,这个数组保存的就是线程组中的线程,EventLoopGroup会通过newChild()方法对每个EventExecutor进行初始化(对于NioEventLoopGroup来说,这个EventExecutor就是NioEventLoop)。每个NioEventLoop都会绑定一个Selector,每个绑定到这个NioEventLoop的Channel都使用这个Selector。
private final EventExecutor[] children;
//NioEventLoopGroup.java
//line 143-147
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
//NioEventLoop.java
//line 135-145
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final NioEventLoop.SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
二. boss NioEventLoop接收IO连接请求
ServerBootstrap的boosEventLoop会在NioServerSocketChannel启动的过程中同时启动,并且调用他的run()方法。run()方法的逻辑比较复杂,但是我们只需要关注下面这一段代码,run()方法中会调用另外两个方法processSelectedKeys()和runAllTasks()。processSelectedKeys()方法处理Selector发现的IO任务,而runAllTasks()方法则处理普通的任务。
//NioEventLoop.run()
//line 481-501
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
在run()方法中,会在一个死循环中不断调用select()方法,获取当前的IO事件(上面的代码中没有贴出来),然后调用processSelectedKeys()方法处理这些IO事件。然后在processSelectedKeysOptimized()方法中通过一个循环依次处理所有的selectedKeys,如果发现有SelectionKey.OP_READ | SelectionKey.OP_ACCEPT类型的IO事件,则调用unsafe.read()来处理。
//NioEventLoop.java
//line 574-580
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
//NioEventLoop.java
//line 640-666
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
//NioEventLoop.java
//line 668-719
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
三. NioServerSocketChannel处理连接请求
上面的代码走到unsafe.read()这一步时,就到了下面这段代码,read()方法主要做了两件事
- 一直调用doReadMessages(readBuf)方法,处理每个连接请求,知道所有的连接请求都处理完毕
- NioServerSocketChannel的ChannelPipeline触发ChannelRead事件,触发的次数等于这次处理的连接请求的次数。并且在最后再触发一次ChannelReadComplete事件。
//AbstractNioMessageChannel.java
//line 63-122
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
四. 创建并注册NioSocketChannel
doReadMessages(List<Object> buf)方法调用accept()方法获取到JDK NIO的原生SocketChannel,然后再将其包装为一个Netty自己的NioSocketChannel,并且在他的构造方法中,会将其设置为非阻塞式的。
然后,NioServerSocketChannel的ChannelPipeline会对每一个连接请求触发一次ChannelRead事件,显然,这个事件会被Pipeline的Handler所捕获,那么NioServerSocketChannel的Pipeline有哪些Handler呢?还记得在NioServerSocketChannel注册时我们为他的Pipeline添加过一个名为ServerBootstrapAcceptor的Handler吗?我们找到他的channelRead方法,在该方法中,我们为客户端NioSocketChannel添加handler,设置channelOption,并用child EventLoopGroup对其进行注册,注册的过程与NioServerSocketChannel的注册过程基本相同,这里也就不再赘述,只需要注意NioSocketChannel注册的感兴趣的IO事件为OP_READ而不是OP_ACCEPT。
到这里,客户端与服务器端的连接就完全建立起来了。客户端与服务器端的连接过程是由boss EventLoop处理,而当连接建立完成后,后续当客户端向服务器端写数据时,该IO_READ事件则会交由child EventLoop来处理,即连接的建立和数据的读写是两个不同的线程池完成的,这是完全符合Reactor设计模式的,也是为什么Netty在高并发场景下表现如此优异的原因之一。
//NioServerSocketChannel.java
//line 146-165
protected int doReadMessages(List<Object> buf) throws Exception {
java.nio.channels.SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
//AbstractNioChannel.java
//line 79-95
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false); //将channel设置为非阻塞式的
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
//ServerBootstrap.java
//line 209-229
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
//对客户端NioSocketChannel进行注册,这里是注册到childGroup上,所以该Channel后续的IO事件都由childGroup处理
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
最后
以上就是鲤鱼唇膏为你收集整理的Netty源码解析(三)之客户端的连接一. 先从EventLoopGroup和EventLoop开始 二. boss NioEventLoop接收IO连接请求三. NioServerSocketChannel处理连接请求 四. 创建并注册NioSocketChannel的全部内容,希望文章能够帮你解决Netty源码解析(三)之客户端的连接一. 先从EventLoopGroup和EventLoop开始 二. boss NioEventLoop接收IO连接请求三. NioServerSocketChannel处理连接请求 四. 创建并注册NioSocketChannel所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复