概述
【JAVA 网络编程系列】Netty -- Netty 关闭流程
【1】Netty 关闭方法 -- shutdownGracefully()
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
EchoServerHandler echoServerHandler = new EchoServerHandler();
try {
...
} finally {
// 优雅地关闭两个线程池
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
【2】Netty 关闭流程
【2.1】Netty 关闭流程 -- shutdownGracefully()
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
@Override
public Future<?> shutdownGracefully() {
// 调用重载方法
// 第一个参数为静默周期,默认2秒
// 第二个参数为超时时间,默认15秒
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}
}
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
for (EventExecutor l: children) {
// 调用孩子的shutdownGracefully()
// 这里的EventExecutor就是NioEventLoop
l.shutdownGracefully(quietPeriod, timeout, unit);
}
// 返回的是NioEventLoopGroup的terminationFuture
return terminationFuture();
}
}
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor
implements OrderedEventExecutor {
// 对应于 NioEventLoop 的 shutdownGracefully()
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
// 参数检查
if (quietPeriod < 0) {
throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
}
if (timeout < quietPeriod) {
throw new IllegalArgumentException(
"timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
}
if (unit == null) {
throw new NullPointerException("unit");
}
// 其它线程正在执行关闭,直接返回
if (isShuttingDown()) {
return terminationFuture();
}
// 判断是否处于当前线程
boolean inEventLoop = inEventLoop();
boolean wakeup;
int oldState;
for (; ; ) {
// 再次检查其它线程正在执行关闭,直接返回
if (isShuttingDown()) {
return terminationFuture();
}
int newState;
wakeup = true;
// 缓存当前状态
oldState = state;
// 更新状态
if (inEventLoop) {
newState = ST_SHUTTING_DOWN;
} else {
switch (oldState) {
//五种状态
//private static final int ST_NOT_STARTED = 1;
//private static final int ST_STARTED = 2;
//private static final int ST_SHUTTING_DOWN = 3;
//private static final int ST_SHUTDOWN = 4;
//private static final int ST_TERMINATED = 5;
case ST_NOT_STARTED:
case ST_STARTED:
newState = ST_SHUTTING_DOWN;
break;
default:
// 此时已经处于关闭相关的状态
newState = oldState;
// 无需再唤醒selector
wakeup = false;
}
}
// 更新状态成功,退出循环
if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
break;
}
}
// 修改NioEventLoop的属性标识
gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
gracefulShutdownTimeout = unit.toNanos(timeout);
// 若线程没有启动则启动线程
// 真正的关闭逻辑将在主循环中处理
if (oldState == ST_NOT_STARTED) {
doStartThread();
}
// 添加一个空任务,唤醒EventLoop
if (wakeup) {
//protected void wakeup(boolean inEventLoop) 在NioEventLoop中被覆写如下
//@Override
//protected void wakeup(boolean inEventLoop) {
// if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
// 唤醒selector
// selector.wakeup();
// }
//}
wakeup(inEventLoop);
}
// 返回NioEventLoop的terminationFuture
return terminationFuture();
}
}
【2.2】Netty 关闭流程 -- NioEventLoop.protected void run()
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor
implements OrderedEventExecutor {
private void doStartThread() {
assert thread == null;
// 真正启动线程的地方
executor.execute(() -> {
...
try {
// protected abstract void run();
// 该方法由子类覆写
// 比如NioEventLoop中对run()方法的覆写
// 在NioEventLoop类的run()方法中开启了一个主循环
SingleThreadEventExecutor.this.run();
// 标记启动成功
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
...
}
});
}
}
在 NioEventLoop.protected void run() 方法中真正处理 Netty 的关闭逻辑;
public final class NioEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
for (;;) {
...
// 主循环中一直在处理关闭逻辑
try {
// 判断是否处于关闭中
if (isShuttingDown()) {
// 关闭
closeAll();
// 确定关闭
// 若confirmShutdown()返回true则跳出循环,run方法执行完毕
// 若confirmShutdown()返回false则继续循环直到所有任务执行完毕
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
// 处理异常
handleLoopException(t);
}
}
}
private void closeAll() {
// 再次调用selectNow()方法
selectAgain();
// 获取selector中所有的SelectionKey
Set<SelectionKey> keys = selector.keys();
Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
for (SelectionKey k: keys) {
// 此处获取的附件就是NioServerSocketChannel
Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
// 把要关闭的Channel加到集合中
channels.add((AbstractNioChannel) a);
} else {
k.cancel();
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, k, null);
}
}
// 遍历集合
for (AbstractNioChannel ch: channels) {
// 调用Channel的unsafe进行关闭
ch.unsafe().close(ch.unsafe().voidPromise());
}
}
}
【2.3】Netty 关闭流程 -- unsafe.close()
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void close(final ChannelPromise promise) {
assertEventLoop();
// 调用重载方法
close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
}
private void close(final ChannelPromise promise, final Throwable cause,
final ClosedChannelException closeCause, final boolean notify) {
// 设置promise不可取消
if (!promise.setUncancellable()) {
return;
}
// 使用closeInitiated防止重复关闭
if (closeInitiated) {
// 若已经开启了关闭处理,则为closeFuture设置处理监听器
if (closeFuture.isDone()) {
// 已经关闭了设置promise的状态为成功
safeSetSuccess(promise);
} else if (!(promise instanceof VoidChannelPromise)) {
// 尚未完成关闭则添加监听器
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.setSuccess();
}
});
}
return;
}
// 下面的逻辑只会执行一次
closeInitiated = true;
// 判断Channel是否处于激活状态
final boolean wasActive = isActive();
// 获取写出数据时的缓存
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
// 置为空表示不允许再写出数据了
this.outboundBuffer = null;
// 关闭Channel的准备工作
// prepareToClose()
// 对于NioServerSocketChannel,默认为空
// 对于NioSocketChannel覆写了该方法
Executor closeExecutor = prepareToClose();
if (closeExecutor != null) {
// 若返回的closeExecutor不为null则将任务加入该线程池的任务队列中排队处理
closeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// Execute the close.
// 关闭 Channel 并将所有在消息队列中的消息的状态置为 fail
doClose0(promise);
} finally {
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
invokeLater(new Runnable() {
@Override
public void run() {
if (outboundBuffer != null) {
// Fail all the queued messages
// 未发送的数据标记为 fail
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
// 触发channelInactive()和channelDeregister()方法
fireChannelInactiveAndDeregister(wasActive);
}
});
}
}
});
} else {
try {
// Close the channel and fail the queued messages in all cases.
// 关闭 Channel 并将所有在消息队列中的消息的状态置为 fail
doClose0(promise);
} finally {
if (outboundBuffer != null) {
// Fail all the queued messages.
// 未发送的数据标记为 fail
outboundBuffer.failFlushed(cause, notify);
outboundBuffer.close(closeCause);
}
}
if (inFlush0) {
invokeLater(new Runnable() {
@Override
public void run() {
// 触发channelInactive()和channelDeregister()方法
fireChannelInactiveAndDeregister(wasActive);
}
});
} else {
// 触发channelInactive()和channelDeregister()方法
fireChannelInactiveAndDeregister(wasActive);
}
}
}
private void doClose0(ChannelPromise promise) {
try {
// 关闭 Channel
// 该方法由子类覆写
doClose();
// 将closeFuture设置为已关闭
closeFuture.setClosed();
// 将promise设置为已成功
safeSetSuccess(promise);
} catch (Throwable t) {
// 处理异常
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
private void fireChannelInactiveAndDeregister(final boolean wasActive) {
deregister(voidPromise(), wasActive && !isActive());
}
private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
// 设置promise不可取消
if (!promise.setUncancellable()) {
return;
}
// 若尚未注册则设置promise成功并直接返回
if (!registered) {
safeSetSuccess(promise);
return;
}
// 加入到执行队列
invokeLater(new Runnable() {
@Override
public void run() {
try {
// 执行注销处理
doDeregister();
} catch (Throwable t) {
// 处理异常
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
} finally {
if (fireChannelInactive) {
// 触发ChannelHandler的channelInactive方法
pipeline.fireChannelInactive();
}
if (registered) {
registered = false;
// 触发ChannelHandler的channelUnregistered方法
pipeline.fireChannelUnregistered();
}
safeSetSuccess(promise);
}
}
});
}
}
}
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
private final class NioSocketChannelUnsafe extends NioByteUnsafe {
@Override
protected Executor prepareToClose() {
try {
if (javaChannel().isOpen() && config().getSoLinger() > 0) {
doDeregister();
return GlobalEventExecutor.INSTANCE;
}
} catch (Throwable ignore) {
}
return null;
}
}
@Override
protected void doClose() throws Exception {
super.doClose();
// 关闭Java原生的Channel,此处为SocketChannel
javaChannel().close();
}
}
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
@Override
protected void doClose() throws Exception {
// 关闭Java原生的Channel,此处为ServerSocketChannel
javaChannel().close();
}
}
public abstract class AbstractNioChannel extends AbstractChannel {
@Override
protected void doDeregister() throws Exception {
// 取消SelectionKey
eventLoop().cancel(selectionKey());
}
}
【2.4】Netty 关闭流程 -- SingleThreadEventExecutor.confirmShutdown()
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor
implements OrderedEventExecutor {
protected boolean confirmShutdown() {
// 不是正在关闭,返回false
if (!isShuttingDown()) {
return false;
}
if (!inEventLoop()) {
throw new IllegalStateException("must be invoked from an event loop");
}
// 取消定时任务
cancelScheduledTasks();
// 设置优雅关闭服务的开始时间
if (gracefulShutdownStartTime == 0) {
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
}
// 运行所有任务和所有shudown的钩子任务
if (runAllTasks() || runShutdownHooks()) {
// 当任务队列中存在任务
// 并且所有任务执行完毕进入该分支
if (isShutdown()) {
// Executor shut down - no new tasks anymore.
return true;
}
// 如果静默周期为0,返回true
if (gracefulShutdownQuietPeriod == 0) {
return true;
}
// 否则添加一个空任务,返回false
wakeup(true);
return false;
}
// 运行到此表明没有任何任务在运行
final long nanoTime = ScheduledFutureTask.nanoTime();
// 如果已经关闭,或者超时了,返回true
if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
return true;
}
// 如果当前时间减去上一次运行的时间在静默周期以内
if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
// 添加一个空任务,并休眠100ms
wakeup(true);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
}
return false;
}
// 超过了静默周期返回true
return true;
}
}
【2.5】Netty 关闭流程 -- 无任务运行/超过静默周期后的处理
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor
implements OrderedEventExecutor {
private void doStartThread() {
...
executor.execute(() -> {
...
try {
...
} catch (Throwable t) {
...
} finally {
// 修改状态为ST_SHUTDOWN,之后不能再添加任何任务
for (; ; ) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
if (success && gracefulShutdownStartTime == 0) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// 再次执行confirmShutdown()直到没有任务或者超时
for (; ; ) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
// 执行 cleanup() 方法
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
// threadLock标识减一,将触发某些事件
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
// NioEventLoop的terminationFuture已成功
terminationFuture.setSuccess(null);
}
}
}
});
}
}
【2.5.1】protected void cleanup() 分析
public final class NioEventLoop extends SingleThreadEventLoop {
// 关闭 selector
@Override
protected void cleanup() {
try {
selector.close();
} catch (IOException e) {
logger.warn("Failed to close a selector.", e);
}
}
}
【2.5.2】threadLock.release() 分析
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor
implements OrderedEventExecutor {
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
if (unit == null) {
throw new NullPointerException("unit");
}
if (inEventLoop()) {
throw new IllegalStateException("cannot await termination of the current thread");
}
// tryAcquire()的作用是尝试的获得1个许可,如果获取不到则返回false
if (threadLock.tryAcquire(timeout, unit)) {
// 释放一个许可
threadLock.release();
}
return isTerminated();
}
}
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long deadline = System.nanoTime() + unit.toNanos(timeout);
// 循环每一个NioEventLoop,等待它们终止
loop: for (EventExecutor l: children) {
for (;;) {
long timeLeft = deadline - System.nanoTime();
if (timeLeft <= 0) {
break loop;
}
if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
break;
}
}
}
return isTerminated();
}
}
【2.6】NioEventLoopGroup/NioEventLoop 中 terminationFuture 之间的关联建立
NioEventLoopGroup 的 terminationFuture 是在其 shutdownGracefully() 方法中返回;NioEventLoop 的 terminationFuture 是在其 shutdownGracefully() 方法中返回;
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
...
// 创建一个监听器
final FutureListener<Object> terminationListener = future -> {
// 每个孩子完成时,terminatedChildren加一
// 如果等于孩子数量,说明全部完成了
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
};
// 给每个孩子都添加上这个监听器
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
}
}
参考致谢
本博客为博主学习笔记,同时参考了网上众博主的博文以及相关专业书籍,在此表示感谢,本文若存在不足之处,请批评指正。
【1】慕课专栏,网络编程之Netty一站式精讲
【2】极客时间,Netty源码剖析与实战
最后
以上就是健康黑米为你收集整理的【JAVA 网络编程系列】Netty -- Netty 关闭流程【JAVA 网络编程系列】Netty -- Netty 关闭流程的全部内容,希望文章能够帮你解决【JAVA 网络编程系列】Netty -- Netty 关闭流程【JAVA 网络编程系列】Netty -- Netty 关闭流程所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复