我是靠谱客的博主 称心小土豆,最近开发中收集的这篇文章主要介绍netty 旷视科技_Netty中NioEventLoop源码分析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

准备赶趟上春招实习的车,刚开始学netty,感觉项目很简单,所以就想看看源码,非科班看这些是真吃力。。放点小笔记,都是站在巨人的肩膀上总结的(小抄),加油!

前置芝士:

需要知道Executor执行器的一些操作。

Demo

public class NettyServer {

int port;

public NettyServer(int port) {

this.port = port;

}

public void start() {

ServerBootstrap bootstrap = new ServerBootstrap();

EventLoopGroup boss = new NioEventLoopGroup(1);

EventLoopGroup work = new NioEventLoopGroup();

try {

bootstrap.group(boss, work)

.handler(new LoggingHandler(LogLevel.DEBUG))

.channel(NioServerSocketChannel.class)

.childHandler(new ChatRoomServerInitializer());

ChannelFuture f = bootstrap.bind(new InetSocketAddress(port)).sync();

System.out.println("http server started. port : " + port);

f.channel().closeFuture().sync();

} catch (Exception e) {

e.printStackTrace();

} finally {

boss.shutdownGracefully();

work.shutdownGracefully();

}

}

public static void main(String[] args) {

NettyServer server = new NettyServer(8080);// 8080为启动端口

server.start();

}

}

上面是一段比较简单的Netty服务端的代码,我们主要关注:

EventLoopGroup boss = new NioEventLoopGroup(1); // 用于新连接接入的Group,初始化为1

EventLoopGroup work = new NioEventLoopGroup(); // 用于处理channel中的io事件以及任务的group

NioEventLoopGroup初始化过程

跟进到上述构造函数中,最后会来到MultithreadEventLoopGroup 类中的构造函数:

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {

super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);

}

传入的参数就是指定Group的大小,默认大小 DEFAULT_EVENT_LOOP_THREADS 是 Runtime.getRuntime().availableProcessors() * 2 也就是两倍的CPU数。

继续跟会来到:

# MultithreadEventExecutorGroup.java

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,

EventExecutorChooserFactory chooserFactory, Object... args) {

if (nThreads <= 0) {

throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));

}

// 这个executor是group所包含的executor,其将来会为其所包含的每个eventLoop创建一个线程

if (executor == null) {

executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());

}

children = new EventExecutor[nThreads];

for (int i = 0; i < nThreads; i ++) {

boolean success = false;

try {

// 创建eventLoop

children[i] = newChild(executor, args);

success = true;

} catch (Exception e) {

// TODO: Think about if this is a good exception type

throw new IllegalStateException("failed to create a child event loop", e);

} finally {

// 有创建失败的eventLoop就关闭所有之前创建的

if (!success) {

for (int j = 0; j < i; j ++) {

children[j].shutdownGracefully();

}

for (int j = 0; j < i; j ++) {

EventExecutor e = children[j];

try {

while (!e.isTerminated()) {

e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);

}

} catch (InterruptedException interrupted) {

// Let the caller handle the interruption.

Thread.currentThread().interrupt();

break;

}

}

}

}

}

// 创建选择器

chooser = chooserFactory.newChooser(children);

final FutureListener terminationListener = new FutureListener() {

@Override

public void operationComplete(Future future) throws Exception {

if (terminatedChildren.incrementAndGet() == children.length) {

terminationFuture.setSuccess(null);

}

}

};

for (EventExecutor e: children) {

e.terminationFuture().addListener(terminationListener);

}

Set childrenSet = new LinkedHashSet(children.length);

Collections.addAll(childrenSet, children);

readonlyChildren = Collections.unmodifiableSet(childrenSet);

}

从类名就可以知道这是多线程的线程组,这里主要完成几件事情:

new ThreadPerTaskExecutor() [线程创建器]

for(){ new Child() } [构造NioEventLoop]

chooserFactory.newChooser() [线程选择器]

线程创建器

先看看名字,是给每个任务创建一个线程的线程创建器,其保存在NioEventGroup中的executor中。主要是为每一个NioEventLoop创建一个对应的线程,1:1。

# ThreadPerTaskExecutor.java

public final class ThreadPerTaskExecutor implements Executor {

private final ThreadFactory threadFactory;

public ThreadPerTaskExecutor(ThreadFactory threadFactory) {

if (threadFactory == null) {

throw new NullPointerException("threadFactory");

}

// 传入一个线程工厂

this.threadFactory = threadFactory;

}

@Override

public void execute(Runnable command) {

// 在执行exector的execute()方法时,调用线程工厂创建线程,并start()

threadFactory.newThread(command).start();

}

}

上述代码其实就是初始化NioEventGroup中的executor为一个线程工厂,通过之后调用execute()方法为将来的NioEventLoop创建线程来一一对应。

打住,先来看看NioEventLoop的继承关系:

NioEventLoop%E7%BB%A7%E6%89%BF%E5%9B%BE.png

这图挂了。。

可知NioEventLoop本身就是一个单线程的EventExecutor,因此有下面创建线程组数组

children = new EventExecutor[nThreads];

而实例化创建EventLoop在函数newChild()中。

构造NioEventLoop

我们跟进到构造NioEventLoop的函数newChild():

# NioEventLoopGroup.java

@Override

protected EventLoop newChild(Executor executor, Object... args) throws Exception {

return new NioEventLoop(this, executor, (SelectorProvider) args[0],

((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);

}

继续跟来到NioEventLoop的构造函数:

# NioEventLoop.java

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,

SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {

// 父类单线程的构造方法,传入的参数executor是group中的executor

super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);

if (selectorProvider == null) {

throw new NullPointerException("selectorProvider");

}

if (strategy == null) {

throw new NullPointerException("selectStrategy");

}

provider = selectorProvider;

selector = openSelector(); // 创建selector事件轮询器到NioEventLoop上

selectStrategy = strategy;

}

先跟进父类的构造方法:

# SingleThreadEventLoop.java

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,

boolean addTaskWakesUp, int maxPendingTasks,

RejectedExecutionHandler rejectedExecutionHandler) {

super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);

tailTasks = newTaskQueue(maxPendingTasks); // 创建一个Mpsc任务队列,多生产者,单个消费者的任务队列

}

上述代码父类构造除了继续调用父类构造外,创建了一个Mpsc任务队列,外部线程的任务会被加入到这个任务中,保证只有一个线程去处理这些任务,保证线程安全。

# SingleThreadEventExecutor.java

protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,

boolean addTaskWakesUp, int maxPendingTasks,

RejectedExecutionHandler rejectedHandler) {

super(parent);

this.addTaskWakesUp = addTaskWakesUp;

this.maxPendingTasks = Math.max(16, maxPendingTasks);

// 这是当前NioEventLoop所包含的executor

this.executor = ObjectUtil.checkNotNull(executor, "executor");

taskQueue = newTaskQueue(this.maxPendingTasks); // 创建一个普通任务队列

rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");

}

看博客说关于wakeup的代码比较旧,就不看了。????

最终父类会创建一个单线程的线程创建器SingleThreadEventExecutor,除此之外,还保存了executor到了当前NioEventLoop中,也就是前面一路下来的group中的executor,帮你回忆一下:

children[i] = newChild(executor, args);

保存这个executor主要是为了之后调用NioEventLoop的execute()方法时其实就是调用传入的这个执行器,也就是executor.execute()。

然后是创建事件轮询器:

selector = openSelector();

# NioEventLoop.java

// 只截取了关心的一些代码

private Selector openSelector() {

final Selector selector;

try {

selector = provider.openSelector();

} catch (IOException e) {

throw new ChannelException("failed to open a new selector", e);

}

if (DISABLE_KEYSET_OPTIMIZATION) {

return selector;

}

... ...

}

其中这个provider是一个java.nio中的SelectorProvider,也就是调用jdk创建了一个selector绑定到NioEventLoop上。

线程选择器

# MultithreadEventExecutorGroup.java

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {

this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);

}

在构造函数中,选择器参数传入的是一个默认选择器工厂的实例(单例模式)。

chooser = chooserFactory.newChooser(children);

将线程组交给线程选择器,跟进到chooserFactory.newChooser()

# DefaultEventExecutorChooserFactory.java

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

// 单例

public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

private DefaultEventExecutorChooserFactory() { }

@SuppressWarnings("unchecked")

@Override

public EventExecutorChooser newChooser(EventExecutor[] executors) {

if (isPowerOfTwo(executors.length)) {

return new PowerOfTowEventExecutorChooser(executors);

} else {

return new GenericEventExecutorChooser(executors);

}

}

// 判断长度是否是2的幂

private static boolean isPowerOfTwo(int val) {

return (val & -val) == val;

}

// 如果是就用这个分配

private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {

private final AtomicInteger idx = new AtomicInteger();

private final EventExecutor[] executors;

PowerOfTowEventExecutorChooser(EventExecutor[] executors) {

this.executors = executors;

}

@Override

public EventExecutor next() {

return executors[idx.getAndIncrement() & executors.length - 1]; // 用与运算

}

}

// 否则就是普通的分配

private static final class GenericEventExecutorChooser implements EventExecutorChooser {

private final AtomicInteger idx = new AtomicInteger();

private final EventExecutor[] executors;

GenericEventExecutorChooser(EventExecutor[] executors) {

this.executors = executors;

}

@Override

public EventExecutor next() {

return executors[Math.abs(idx.getAndIncrement() % executors.length)];

}

}

}

选择器策略是一个一个往后分配,循环遍历整个线程组给新连接绑定对应的NioEventLoop,实际的调用在MultithreadEventExecutorGroup.next()。

还做了以下的优化:

先判断线程组数组的长度是否是2的幂

如果是,则调用PowerOfTowEventExecutorChooser(),使用的是位运算替代%,效率比较高

否则就是GenericEventExecutorChooser()

NioEventLoop启动过程

NioEventLoop启动触发器:

服务端启动绑定端口

新连接接入通过chooser绑定一个NioEventLoop

以绑定端口为例跟一下启动过程,先说一下总的逻辑:

bind() -> execute(task) [入口]

startThread() -> doStartThread() [创建线程]

ThreadPerTaskExecutor.execute()

thread = Thread.currentThread()

NioEventLoop.run() [启动]

先跟进到入口,bind方法:

# AbstractBootstrap.java

channel.eventLoop().execute(new Runnable() {

@Override

public void run() {

if (regFuture.isSuccess()) {

channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

} else {

promise.setFailure(regFuture.cause());

}

}

});

将端口绑定作为一个Runnable任务去调用NioEventLoop.execute()方法,具体实现在父类中。

# SingleThreadEventExecutor.java

@Override

public void execute(Runnable task) {

if (task == null) {

throw new NullPointerException("task");

}

boolean inEventLoop = inEventLoop();

if (inEventLoop) {

addTask(task);

} else {

startThread();

addTask(task);

if (isShutdown() && removeTask(task)) {

reject();

}

}

if (!addTaskWakesUp && wakesUpForTask(task)) {

wakeup(inEventLoop);

}

}

先留意下inEventLoop()这个方法,能够解决netty的异步串行无锁化。

直接跟到startThread()中,还是在这个类中:

private void startThread() {

if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {

if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { // 如果线程状态为未启动,就cas设置成启动状态,然后执行下面方法

doStartThread();

}

}

}

// 只贴关心的代码

private void doStartThread() {

assert thread == null;

executor.execute(new Runnable() {

@Override

public void run() {

thread = Thread.currentThread(); // 保留当前线程信息,绑定端口任务一开始的线程就是main线程

if (interrupted) {

thread.interrupt();

}

boolean success = false;

updateLastExecutionTime();

try {

SingleThreadEventExecutor.this.run(); // for循环执行任务队列的任务

success = true;

} catch (Throwable t) {

... ...

}

... ...

}

... ...

}

}

上述代码中的executor是NioEventLoop所包含的那个,根据之前创建过程可以知道,这个execute最后会调用NioEventLoopGroup中的execute方法。

NioEventLoop执行流程

先说一下总的执行逻辑:

run() -> for(;;)

select() [检查是否有io事件]

processSelectedKeys() [处理io事件]

runAllTasks() [处理异步任务队列]

run()方法中有一个for循环,一共做三件事,select注册到轮询器上的channel中的io事件,然后调用processSelectedKeys()处理轮询出来的io事件,runAllTasks()处理外部线程扔到taskqueue中的任务。

跟进到SingleThreadEventExecutor.this.run():

# NioEventLoop.java

@Override

protected void run() {

for (;;) {

try {

// selector选择一个已经就绪的channel

switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {

case SelectStrategy.CONTINUE:

continue;

case SelectStrategy.SELECT: // SelectStrategy.SELECT == -1,说明此时任务队列中没有任务

// 阻塞式选择

select(wakenUp.getAndSet(false));

... ...

if (wakenUp.get()) {

selector.wakeup();

}

default:

// fallthrough

}

cancelledKeys = 0;

needsToSelectAgain = false;

final int ioRatio = this.ioRatio; // 这是一个处理io和处理任务队列任务的比值

if (ioRatio == 100) {

try {

processSelectedKeys(); // 处理就绪channel的io

} finally {

// Ensure we always run tasks.

runAllTasks(); // 执行任务队列中的任务

}

} else {

final long ioStartTime = System.nanoTime();

try {

processSelectedKeys();

} finally {

// Ensure we always run tasks.

final long ioTime = System.nanoTime() - ioStartTime;

runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

}

}

} catch (Throwable t) {

handleLoopException(t);

}

// Always handle shutdown even if the loop processing threw an exception.

try {

if (isShuttingDown()) {

closeAll();

if (confirmShutdown()) {

return;

}

}

} catch (Throwable t) {

handleLoopException(t);

}

}

}

ioRatio默认是50,说明处理io和任务处理时间是1:1的。

接下来主要讲一下之前提到的三个过程。

select()方法执行逻辑

这一段逻辑就是在以下代码:

switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {

case SelectStrategy.CONTINUE:

continue;

case SelectStrategy.SELECT:

select(wakenUp.getAndSet(false));

跟进到主要方法select()中,还是在这个类中:

private void select(boolean oldWakenUp) throws IOException {

Selector selector = this.selector;

try {

int selectCnt = 0;

long currentTimeNanos = System.nanoTime();

long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

for (;;) {

// 判断是否超时

long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;

// 这一段的逻辑就是如果当前超时了,说明有定时任务要执行,那么如果一次都没有选择过,就执行一次非阻塞的选择,并且将选择计数器加1,break

if (timeoutMillis <= 0) {

if (selectCnt == 0) {

selector.selectNow();

selectCnt = 1;

}

break;

}

// If a task was submitted when wakenUp value was true, the task didn't get a chance to call

// Selector#wakeup. So we need to check task queue again before executing select operation.

// If we don't, the task might be pended until select operation was timed out.

// It might be pended until idle timeout if IdleStateHandler existed in pipeline.

// 异步任务队列中是否有任务,如果有任务的话,就直接执行,并+1返回

if (hasTasks() && wakenUp.compareAndSet(false, true)) {

selector.selectNow();

selectCnt = 1;

break;

}

// 如果没有定时任务,没有超时,且任务队列中没有任务,就执行阻塞式select,超时时间1s

int selectedKeys = selector.select(timeoutMillis);

selectCnt ++;

// 如果轮询到了时间 | select被外部线程唤醒 | 有任务 | 有定时任务,当前select操作就终止

if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {

// - Selected something,

// - waken up by user, or

// - the task queue has a pending task.

// - a scheduled task is ready for processing

break;

}

if (Thread.interrupted()) {

// Thread was interrupted so reset selected keys and break so we not run into a busy loop.

// As this is most likely a bug in the handler of the user or it's client library we will

// also log it.

//

// See https://github.com/netty/netty/issues/2426

if (logger.isDebugEnabled()) {

logger.debug("Selector.select() returned prematurely because " +

"Thread.currentThread().interrupt() was called. Use " +

"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");

}

selectCnt = 1;

break;

}

// 执行到这里说明以及进行了一次阻塞式的select操作

long time = System.nanoTime();

if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {

// timeoutMillis elapsed without anything selected.

// 如果阻塞时间到了,就说明执行了一次阻塞式select,那么计数器就是1

selectCnt = 1;

} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&

selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {

// 否则没有到阻塞时间,那么说明没有进行阻塞就返回了,执行了空轮询,空轮询到一定阈值就会rebuildSelector()

// The selector returned prematurely many times in a row.

// Rebuild the selector to work around the problem.

logger.warn(

"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",

selectCnt, selector);

rebuildSelector();

selector = this.selector;

// Select again to populate selectedKeys.

selector.selectNow();

selectCnt = 1;

break;

}

currentTimeNanos = time;

}

if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {

if (logger.isDebugEnabled()) {

logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",

selectCnt - 1, selector);

}

}

} catch (CancelledKeyException e) {

if (logger.isDebugEnabled()) {

logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",

selector, e);

}

// Harmless exception - log anyway

}

}

注意:

delayNanos(currentTimeNanos):用来计算当前定时任务队列第一个定时任务还有多久执行。

selector.selectNow():非阻塞选择

看了博客补充一下:Selector 的阻塞选择和非阻塞选择的区别就是,非阻塞选择在当前 select方法执行时判断循环判断所有的 channel 是否就绪并返回所有的就绪数量,而阻塞式选择则是阻塞指定时间直至阻塞时间内获取到就绪 channel 或者阻塞时间超时时立刻返回。

这个select()方法主要干了几件事情:

第一件事情,deadline以及任务穿插逻辑处理:

首先计算deadline,也就是定时任务的执行时间,计算出一个超时时间timeoutMillis,也就是距离最近一次定时任务开始的时间,如果小于0,说明要执行定时任务,则执行一次非阻塞的选择:

// 这一段的逻辑就是如果当前超时了,说明有定时任务要执行,那么如果一次都没有选择过,就执行一次非阻塞的选择,并且将选择计数器加1,break

if (timeoutMillis <= 0) {

if (selectCnt == 0) {

selector.selectNow();

selectCnt = 1;

}

break;

}

第二件事,select阻塞式选择:

// 如果没有定时任务,没有超时,且任务队列中没有任务,就执行阻塞式select,超时时间1s

int selectedKeys = selector.select(timeoutMillis);

selectCnt ++;

// 如果轮询到了时间 | select被外部线程唤醒 | 有任务 | 有定时任务,当前select操作就终止

if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {

// - Selected something,

// - waken up by user, or

// - the task queue has a pending task.

// - a scheduled task is ready for processing

break;

}

第三件事,解决jdk空轮询bug:

// 执行到这里说明以及进行了一次阻塞式的select操作

long time = System.nanoTime();

if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {

// timeoutMillis elapsed without anything selected.

// 如果阻塞时间到了,就说明执行了一次阻塞式select,那么计数器就是1

selectCnt = 1;

} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&

selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {

// 否则没有到阻塞时间,那么说明没有进行阻塞就返回了,执行了空轮询,空轮询到一定阈值就会rebuildSelector()

// The selector returned prematurely many times in a row.

// Rebuild the selector to work around the problem.

logger.warn(

"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",

selectCnt, selector);

rebuildSelector();

selector = this.selector;

// Select again to populate selectedKeys.

selector.selectNow();

selectCnt = 1;

break;

}

其中阈值SELECTOR_AUTO_REBUILD_THRESHOLD默认512,因此这个selectCnt主要就是用来记录空轮询的次数。

这个解决空轮询的方法其实是有点乐观的,他并没有从根源上解决, 而是rebuildSelector(),期盼着下一次能够不出现空轮询。

processSelectedKey()执行逻辑

NioEventLoop的第二个过程就是处理检测到的io事件。

先来看看netty对于selectKey做了什么小动作。

select操作每次会把就绪状态的io事件添加到底层的hashset当中,而netty会通过反射把这个hashset修改成数组,这样添加的操作就是O(1)的时间复杂度,具体过程如下:

# NioEventLoop.java

private Selector openSelector() {

final Selector selector;

try {

selector = provider.openSelector();

} catch (IOException e) {

throw new ChannelException("failed to open a new selector", e);

}

// 如果不需要优化,就直接返回原生的selector,默认为false

if (DISABLE_KEYSET_OPTIMIZATION) {

return selector;

}

final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

... ...

}

在之前创建事件轮询器的代码中,做了这方面的优化,他用一个SelectedSelectionKeySet来替换底层的keyset的数据结构:

# SelectedSelectionKeySet.java

final class SelectedSelectionKeySet extends AbstractSet {

private SelectionKey[] keysA;

private int keysASize;

private SelectionKey[] keysB;

private int keysBSize;

private boolean isA = true;

SelectedSelectionKeySet() {

keysA = new SelectionKey[1024];

keysB = keysA.clone();

}

@Override

public boolean add(SelectionKey o) {

if (o == null) {

return false;

}

if (isA) {

int size = keysASize;

keysA[size ++] = o;

keysASize = size;

if (size == keysA.length) {

doubleCapacityA();

}

} else {

int size = keysBSize;

keysB[size ++] = o;

keysBSize = size;

if (size == keysB.length) {

doubleCapacityB();

}

}

return true;

}

...

}

这个数据结构其实就一个方法有用就是add()方法,是由一个数组和size的方法实现添加的,原来的HashSet的添加在最坏情况下为O(n)。

final Class> selectorImplClass = (Class>) maybeSelectorImplClass;

Object maybeException = AccessController.doPrivileged(new PrivilegedAction() {

@Override

public Object run() {

try {

// 获取到成员变量selectedKeys和publicSelectedKeys

Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");

Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

selectedKeysField.setAccessible(true); // 允许修改

publicSelectedKeysField.setAccessible(true);

selectedKeysField.set(selector, selectedKeySet); // 进行替换

publicSelectedKeysField.set(selector, selectedKeySet);

return null;

} catch (NoSuchFieldException e) {

return e;

} catch (IllegalAccessException e) {

return e;

} catch (RuntimeException e) {

// JDK 9 can throw an inaccessible object exception here; since Netty compiles

// against JDK 7 and this exception was only added in JDK 9, we have to weakly

// check the type

if ("java.lang.reflect.InaccessibleObjectException".equals(e.getClass().getName())) {

return e;

} else {

throw e;

}

}

}

});

if (maybeException instanceof Exception) {

selectedKeys = null;

Exception e = (Exception) maybeException;

logger.trace("failed to instrument a special java.util.Set into: {}", selector, e);

} else {

selectedKeys = selectedKeySet;

logger.trace("instrumented a special java.util.Set into: {}", selector);

}

return selector;

还是在这个方法中,当我们构造好这个数组后,通过反射,修改原来selector中的属性selectedKeys和publicSelectedKeys为上面构造好的数组selectedKeySet(一个披着set皮的array)。

在openSelector()方法的最后也将selectedKeySet保存成一个NioEventLoop的成员变量。

对selected keySet优化完后,开始对这些就绪事件进行处理,调用processSelectedKeysOptimized(),跟到run()方法中的processSelectedKey():

# NioEventLoop.java

private void processSelectedKeys() {

if (selectedKeys != null) { // 这个key是优化过的key

processSelectedKeysOptimized(selectedKeys.flip());

} else {

processSelectedKeysPlain(selector.selectedKeys());

}

}

selectedKeys.flip():返回底层的数组,也就是selectedKeys背后真正的keysA数组,继续跟进 :

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {

for (int i = 0;; i ++) {

final SelectionKey k = selectedKeys[i];

if (k == null) {

break;

}

// null out entry in the array to allow to have it GC'ed once the Channel close

// See https://github.com/netty/netty/issues/2363

selectedKeys[i] = null; // 便于GC

final Object a = k.attachment(); // 拿到key的attachment,也就是一个经过netty封装的channel

if (a instanceof AbstractNioChannel) { // 如果是netty的channel

processSelectedKey(k, (AbstractNioChannel) a);

} else {

@SuppressWarnings("unchecked")

NioTask task = (NioTask) a;

processSelectedKey(k, task);

}

if (needsToSelectAgain) {

// null out entries in the array to allow to have it GC'ed once the Channel close

// See https://github.com/netty/netty/issues/2363

for (;;) {

i++;

if (selectedKeys[i] == null) {

break;

}

selectedKeys[i] = null;

}

selectAgain();

// Need to flip the optimized selectedKeys to get the right reference to the array

// and reset the index to -1 which will then set to 0 on the for loop

// to start over again.

//

// See https://github.com/netty/netty/issues/1523

selectedKeys = this.selectedKeys.flip();

i = -1;

}

}

}

显然需要继续跟进到方法processSelectedKey(k, (AbstractNioChannel) a)中:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {

final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // unsafe也是和channel唯一绑定的

if (!k.isValid()) { // 如果key不合法,需要关闭channel

final EventLoop eventLoop;

try {

eventLoop = ch.eventLoop();

} catch (Throwable ignored) {

// If the channel implementation throws an exception because there is no event loop, we ignore this

// because we are only trying to determine if ch is registered to this event loop and thus has authority

// to close ch.

return;

}

// Only close ch if ch is still registerd to this EventLoop. ch could have deregistered from the event loop

// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is

// still healthy and should not be closed.

// See https://github.com/netty/netty/issues/5125

if (eventLoop != this || eventLoop == null) {

return;

}

// close the channel if the key is not valid anymore

unsafe.close(unsafe.voidPromise());

return;

}

try {

int readyOps = k.readyOps();

// 判断就绪事件类型

if ((readyOps & SelectionKey.OP_CONNECT) != 0) {

int ops = k.interestOps();

ops &= ~SelectionKey.OP_CONNECT;

k.interestOps(ops);

unsafe.finishConnect();

}

// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.

if ((readyOps & SelectionKey.OP_WRITE) != 0) {

// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write

ch.unsafe().forceFlush();

}

// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead

// to a spin loop

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 主要关心read和accept事件

unsafe.read();

if (!ch.isOpen()) {

// Connection already closed - no need to handle write.

return;

}

}

} catch (CancelledKeyException ignored) {

unsafe.close(unsafe.voidPromise());

}

}

看了别人博客的总结,这段逻辑就是处理就绪 channel 的 IO 事件的逻辑:

判断当前SelectionKey是否有效。失效结束处理并关闭资源。

判断当前 channel的关注事件,针对处理:获取SelectionKey的 readyOps。这里的判断逻辑都是使用高效的位运算。readyOps 为当前 SelectionKey 的就绪的事件类型。

(readyOps & SelectionKey.OP_CONNECT) != 0:连接就绪事件

这个事件在 server 端不会关注,只有 client 用来连接 server 时才会关注连接就绪事件。

连接就绪后,获取当前SelectionKey的interestOps值,将当前interestOps值修改后,调用底层 unsafe连接server

(readyOps & SelectionKey.OP_WRITE) != 0:写就绪事件

当前 channel 关注的是写就绪事件,此时写操作已经就绪,所以直接调用unsafe将数据写入网卡缓存。

(readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0 :当前channel关注的是读就绪事件,或者前面因为有新增任务而触发的就绪channel处理逻辑,只有因为任务触发的情况下 readyOps才可能会是 0 ,readyOps = 0 意味着没有就绪channel。

直接调用 unsafe 继续读操作,将网卡缓存的数据读取到用户空间。如果是 readyOps = 0 的情况相当于网卡缓存并没有就绪数据,则时进行的读操作不会读取到数据。

unsafe是个啥玩意?现在只知道Unsafe类使Java拥有了像C语言的指针一样操作内存空间的能力,等我有点B树了再说。

runAllTasks()执行逻辑

三件事:对task进行分类和添加、对任务进行聚合、执行任务

第一件事,对task进行分类和添加:

在之前说execute方法时,代码如下:

# SingleThreadEventExecutor.java

@Override

public void execute(Runnable task) {

if (task == null) {

throw new NullPointerException("task");

}

boolean inEventLoop = inEventLoop(); // 是否是外部线程

if (inEventLoop) {

addTask(task);

} else {

startThread();

addTask(task);

if (isShutdown() && removeTask(task)) {

reject();

}

}

if (!addTaskWakesUp && wakesUpForTask(task)) {

wakeup(inEventLoop);

}

}

先判断是否是外部线程调用的execute方法,如果是就先startThread();,再将其加入到创建NioEventLoop时创建的任务队列MpscQueue中。

除了普通任务队列还有一个定时任务队列:

# AbstractScheduledEventExecutor.java

ScheduledFuture schedule(final ScheduledFutureTask task) {

if (inEventLoop()) {

scheduledTaskQueue().add(task);

} else {

execute(new Runnable() {

@Override

public void run() {

scheduledTaskQueue().add(task);

}

});

}

return task;

对于定时任务,这里同样进行了判断,是否是外部线程,如果是外部线程在,则调用execute()方法进行线程安全的操作,即现startThread(),再添加任务,保证只有一个线程进行处理,即都在NioEventLoop中处理。

这是为什么呢?这是因为scheduledTaskQueue的实现是非线程安全的,普通优先队列:

Queue> scheduledTaskQueue() {

if (scheduledTaskQueue == null) {

scheduledTaskQueue = new PriorityQueue>();

}

return scheduledTaskQueue;

}

第二件事,任务的聚合:

# SingleThreadEventExecutor.java

protected boolean runAllTasks(long timeoutNanos) {

fetchFromScheduledTaskQueue(); // 任务的聚合

Runnable task = pollTask();

if (task == null) {

afterRunningAllTasks();

return false;

}

... ...

return true;

}

private boolean fetchFromScheduledTaskQueue() {

long nanoTime = AbstractScheduledEventExecutor.nanoTime();

Runnable scheduledTask = pollScheduledTask(nanoTime); // 取定时任务队列中当前需要允许的任务

while (scheduledTask != null) {

if (!taskQueue.offer(scheduledTask)) {

// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.

scheduledTaskQueue().add((ScheduledFutureTask>) scheduledTask);

return false;

}

scheduledTask = pollScheduledTask(nanoTime);

}

return true;

}

上述while代码中的逻辑,定时任务不为空,即有定时任务:

先讲定时任务添加到普通任务队列中;

如果添加失败,则添加回定时任务队列中,因为之前取的时候poll了,并返回false;

成功添加后,继续从定时任务中取定时任务;

while循环结束,所有定时任务都被添加到了普通任务队列,完成任务的聚合。

第三件事,任务的执行:

protected boolean runAllTasks(long timeoutNanos) {

fetchFromScheduledTaskQueue();

Runnable task = pollTask(); // 从普通任务队列中拿任务

if (task == null) {

afterRunningAllTasks();

return false;

}

final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; // 根据超时时间计算截止时间

long runTasks = 0;

long lastExecutionTime;

for (;;) {

safeExecute(task); // 执行任务,Runnable.run()

runTasks ++;

// Check timeout every 64 tasks because nanoTime() is relatively expensive.

// XXX: Hard-coded value - will make it configurable if it is really a problem.

if ((runTasks & 0x3F) == 0) {

lastExecutionTime = ScheduledFutureTask.nanoTime();

if (lastExecutionTime >= deadline) {

break;

}

}

task = pollTask(); // 继续拿任务

if (task == null) {

lastExecutionTime = ScheduledFutureTask.nanoTime();

break;

}

}

afterRunningAllTasks();

this.lastExecutionTime = lastExecutionTime;

return true;

}

上述代码前半请看注释,关键在于对任务次数的判断:

// Check timeout every 64 tasks because nanoTime() is relatively expensive.

// XXX: Hard-coded value - will make it configurable if it is really a problem.

if ((runTasks & 0x3F) == 0) {

lastExecutionTime = ScheduledFutureTask.nanoTime();

if (lastExecutionTime >= deadline) {

break;

}

}

这里也是与操作,即(&63)== (%64),每执行64次就判断一下是否超时(因为需要和io处理时间分配时间,所以有一个超时时间),如果超时就退出,可能去进行io处理了。

为什么不每次都判断一下呢?上面的英文注释说了,nanoTime()老费时间了。

面试相关

问:Netty如何保证异步串行无锁化?

在NioEventLoop中封装了一个线程, 这个IO线程就是用来处理客户端的连接事件, 读写事件, 处理队列中的任务. 没错, 每个NioEventLoop都有一个队列, 这个队列是在创建NioEventLoop时被初始化的,netty比较重任务。这个任务队列是一个多生产者单消费者的队列,因此可以保证线程安全。根据inEventLoop()判断,如果是外部线程,也就不是我们自己的io线程,那么就把他的runnable任务放到我们的Mpsc队列中来保证线程安全,同理与定时任务队列中的任务,我们的io线程只处理普通任务中的任务,因此保证了线程之间不需要同步。

问:默认情况下,Netty服务端起多少线程?何时启动?

默认两倍CPU数,调用execute()方法判断是否在本线程内,如果是,那么就已经启动了,如果是在外部线程中,那么就需要执行startThread()方法判断线程是否启动,未启动就启动此线程。

问:Netty如何解决jdk空轮询bug?

jdk空轮询是在阻塞式select中,没有阻塞timeoutMillis时间就结束了阻塞select操作,我们称之为一次空轮询,因此判断这种空轮询操作是否超过设定的阈值(512),如果超过,就调用rebuildSelector()方法重建Selector把之前的key都移到新的轮询器上,避免bug。

问:简单说说NioEventLoop?

用户创建Boss/Worker EventLoopGroup时创建,默认创建NioEventLoop个数为2*CPU核数;每个NioEventLoop都由线程选择器chooser分配,并且用与运算优化了选择方式;每个NioEventLoop构造过程中都创建了Selector、任务队列,创建Selector时,通过反射使用数组替换集合方式保存selectedKeys ;NioEventLoop执行时调用execute()方法启动/创建FastThreadLocalThread线程,保存到该NioEventLoop的成员变量中进行一对一绑定;NioEventLoop执行逻辑在run方法中,包括检测io事件、处理io事件、执行任务队列。

PS:看了两天了这个。。非科班实在是水平不够,希望大佬们别喷,只是留个纪念,提提学习netty的建议,虚心接受-;-

最后

以上就是称心小土豆为你收集整理的netty 旷视科技_Netty中NioEventLoop源码分析的全部内容,希望文章能够帮你解决netty 旷视科技_Netty中NioEventLoop源码分析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(47)

评论列表共有 0 条评论

立即
投稿
返回
顶部