我是靠谱客的博主 健康黑米,最近开发中收集的这篇文章主要介绍【JAVA 网络编程系列】Netty -- Netty 关闭流程【JAVA 网络编程系列】Netty -- Netty 关闭流程,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

【JAVA 网络编程系列】Netty -- Netty 关闭流程

【1】Netty 关闭方法 -- shutdownGracefully()

public final class EchoServer {
 
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
 
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        EchoServerHandler echoServerHandler = new EchoServerHandler();
        try {

            ...

        } finally {
            // 优雅地关闭两个线程池
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

【2】Netty 关闭流程

【2.1】Netty 关闭流程 -- shutdownGracefully()

public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {

    @Override
    public Future<?> shutdownGracefully() {
        // 调用重载方法
        // 第一个参数为静默周期,默认2秒
        // 第二个参数为超时时间,默认15秒
        return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
    }
}
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    @Override
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
        for (EventExecutor l: children) {
            // 调用孩子的shutdownGracefully()
            // 这里的EventExecutor就是NioEventLoop
            l.shutdownGracefully(quietPeriod, timeout, unit);
        }
        // 返回的是NioEventLoopGroup的terminationFuture
        return terminationFuture();
    }
}
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor 
    implements OrderedEventExecutor {

    // 对应于 NioEventLoop 的 shutdownGracefully()
    @Override
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
        // 参数检查
        if (quietPeriod < 0) {
            throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
        }
        if (timeout < quietPeriod) {
            throw new IllegalArgumentException(
                    "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        // 其它线程正在执行关闭,直接返回
        if (isShuttingDown()) {
            return terminationFuture();
        }

        // 判断是否处于当前线程
        boolean inEventLoop = inEventLoop();
        boolean wakeup;
        int oldState;
        for (; ; ) {
            // 再次检查其它线程正在执行关闭,直接返回
            if (isShuttingDown()) {
                return terminationFuture();
            }
            int newState;
            wakeup = true;
            // 缓存当前状态
            oldState = state;
            // 更新状态
            if (inEventLoop) {
                newState = ST_SHUTTING_DOWN;
            } else {
                switch (oldState) {
                    //五种状态
                    //private static final int ST_NOT_STARTED = 1;
                    //private static final int ST_STARTED = 2;
                    //private static final int ST_SHUTTING_DOWN = 3;
                    //private static final int ST_SHUTDOWN = 4;
                    //private static final int ST_TERMINATED = 5;
                    case ST_NOT_STARTED:
                    case ST_STARTED:
                        newState = ST_SHUTTING_DOWN;
                        break;
                    default:
                        // 此时已经处于关闭相关的状态
                        newState = oldState;
                        // 无需再唤醒selector
                        wakeup = false;
                }
            }
            // 更新状态成功,退出循环
            if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
                break;
            }
        }
        // 修改NioEventLoop的属性标识
        gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
        gracefulShutdownTimeout = unit.toNanos(timeout);

        // 若线程没有启动则启动线程
        // 真正的关闭逻辑将在主循环中处理
        if (oldState == ST_NOT_STARTED) {
            doStartThread();
        }

        // 添加一个空任务,唤醒EventLoop
        if (wakeup) {
            //protected void wakeup(boolean inEventLoop) 在NioEventLoop中被覆写如下
            //@Override
            //protected void wakeup(boolean inEventLoop) {
            //    if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
            //        唤醒selector
            //        selector.wakeup();
            //    }
            //}
            wakeup(inEventLoop);
        }
        // 返回NioEventLoop的terminationFuture
        return terminationFuture();
    }
}

【2.2】Netty 关闭流程 -- NioEventLoop.protected void run()

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor 
    implements OrderedEventExecutor {

    private void doStartThread() {
        assert thread == null;
        // 真正启动线程的地方
        executor.execute(() -> {
            ...
            try {
                // protected abstract void run();
                // 该方法由子类覆写
                // 比如NioEventLoop中对run()方法的覆写
                // 在NioEventLoop类的run()方法中开启了一个主循环
                SingleThreadEventExecutor.this.run();
                // 标记启动成功
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                ...
            }
        });
    }
}

在 NioEventLoop.protected void run() 方法中真正处理 Netty 的关闭逻辑;

public final class NioEventLoop extends SingleThreadEventLoop {

    @Override
    protected void run() {
        for (;;) {
            ...
            // 主循环中一直在处理关闭逻辑
            try {
                // 判断是否处于关闭中
                if (isShuttingDown()) {
                    // 关闭
                    closeAll();
                    // 确定关闭
                    // 若confirmShutdown()返回true则跳出循环,run方法执行完毕
                    // 若confirmShutdown()返回false则继续循环直到所有任务执行完毕
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                // 处理异常
                handleLoopException(t);
            }
        }
    }

    private void closeAll() {
        // 再次调用selectNow()方法
        selectAgain();
        // 获取selector中所有的SelectionKey
        Set<SelectionKey> keys = selector.keys();
        Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
        for (SelectionKey k: keys) {
            // 此处获取的附件就是NioServerSocketChannel
            Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                // 把要关闭的Channel加到集合中
                channels.add((AbstractNioChannel) a);
            } else {
                k.cancel();
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, k, null);
            }
        }
        // 遍历集合
        for (AbstractNioChannel ch: channels) {
            // 调用Channel的unsafe进行关闭
            ch.unsafe().close(ch.unsafe().voidPromise());
        }
    }
}

【2.3】Netty 关闭流程 -- unsafe.close()

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

    protected abstract class AbstractUnsafe implements Unsafe {

        @Override
        public final void close(final ChannelPromise promise) {
            assertEventLoop();
            // 调用重载方法
            close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false);
        }

        private void close(final ChannelPromise promise, final Throwable cause,
                           final ClosedChannelException closeCause, final boolean notify) {
            // 设置promise不可取消
            if (!promise.setUncancellable()) {
                return;
            }

            // 使用closeInitiated防止重复关闭
            if (closeInitiated) {
                // 若已经开启了关闭处理,则为closeFuture设置处理监听器
                if (closeFuture.isDone()) {
                    // 已经关闭了设置promise的状态为成功
                    safeSetSuccess(promise);
                } else if (!(promise instanceof VoidChannelPromise)) {
                    // 尚未完成关闭则添加监听器
                    closeFuture.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            promise.setSuccess();
                        }
                    });
                }
                return;
            }
            // 下面的逻辑只会执行一次
            closeInitiated = true;
            // 判断Channel是否处于激活状态
            final boolean wasActive = isActive();
            // 获取写出数据时的缓存
            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            // 置为空表示不允许再写出数据了
            this.outboundBuffer = null;
            // 关闭Channel的准备工作
            // prepareToClose()
            // 对于NioServerSocketChannel,默认为空
            // 对于NioSocketChannel覆写了该方法
            Executor closeExecutor = prepareToClose();
            if (closeExecutor != null) {
                // 若返回的closeExecutor不为null则将任务加入该线程池的任务队列中排队处理
                closeExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // Execute the close.
                            // 关闭 Channel 并将所有在消息队列中的消息的状态置为 fail
                            doClose0(promise);
                        } finally {
                            // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
                            invokeLater(new Runnable() {
                                @Override
                                public void run() {
                                    if (outboundBuffer != null) {
                                        // Fail all the queued messages
                                        // 未发送的数据标记为 fail
                                        outboundBuffer.failFlushed(cause, notify);
                                        outboundBuffer.close(closeCause);
                                    }
                                    // 触发channelInactive()和channelDeregister()方法
                                    fireChannelInactiveAndDeregister(wasActive);
                                }
                            });
                        }
                    }
                });
            } else {
                try {
                    // Close the channel and fail the queued messages in all cases.
                    // 关闭 Channel 并将所有在消息队列中的消息的状态置为 fail
                    doClose0(promise);
                } finally {
                    if (outboundBuffer != null) {
                        // Fail all the queued messages.
                        // 未发送的数据标记为 fail
                        outboundBuffer.failFlushed(cause, notify);
                        outboundBuffer.close(closeCause);
                    }
                }
                if (inFlush0) {
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            // 触发channelInactive()和channelDeregister()方法
                            fireChannelInactiveAndDeregister(wasActive);
                        }
                    });
                } else {
                    // 触发channelInactive()和channelDeregister()方法
                    fireChannelInactiveAndDeregister(wasActive);
                }
            }
        }

        private void doClose0(ChannelPromise promise) {
            try {
                // 关闭 Channel
                // 该方法由子类覆写
                doClose();
                // 将closeFuture设置为已关闭
                closeFuture.setClosed();
                // 将promise设置为已成功
                safeSetSuccess(promise);
            } catch (Throwable t) {
                // 处理异常
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

        private void fireChannelInactiveAndDeregister(final boolean wasActive) {
            deregister(voidPromise(), wasActive && !isActive());
        }

        private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
            // 设置promise不可取消
            if (!promise.setUncancellable()) {
                return;
            }
            // 若尚未注册则设置promise成功并直接返回
            if (!registered) {
                safeSetSuccess(promise);
                return;
            }
            // 加入到执行队列
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 执行注销处理
                        doDeregister();
                    } catch (Throwable t) {
                        // 处理异常
                        logger.warn("Unexpected exception occurred while deregistering a channel.", t);
                    } finally {
                        if (fireChannelInactive) {
                            // 触发ChannelHandler的channelInactive方法
                            pipeline.fireChannelInactive();
                        }
                        if (registered) {
                            registered = false;
                            // 触发ChannelHandler的channelUnregistered方法
                            pipeline.fireChannelUnregistered();
                        }
                        safeSetSuccess(promise);
                    }
                }
            });
        }
    }
}
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {

    private final class NioSocketChannelUnsafe extends NioByteUnsafe {

        @Override
        protected Executor prepareToClose() {
            try {
                if (javaChannel().isOpen() && config().getSoLinger() > 0) {
                    doDeregister();
                    return GlobalEventExecutor.INSTANCE;
                }
            } catch (Throwable ignore) {

            }
            return null;
        }
    }

    @Override
    protected void doClose() throws Exception {
        super.doClose();
        // 关闭Java原生的Channel,此处为SocketChannel
        javaChannel().close();
    }
}

public class NioServerSocketChannel extends AbstractNioMessageChannel
                             implements io.netty.channel.socket.ServerSocketChannel {

    @Override
    protected void doClose() throws Exception {
        // 关闭Java原生的Channel,此处为ServerSocketChannel
        javaChannel().close();
    }
}

public abstract class AbstractNioChannel extends AbstractChannel {

    @Override
    protected void doDeregister() throws Exception {
        // 取消SelectionKey
        eventLoop().cancel(selectionKey());
    }
}

【2.4】Netty 关闭流程 -- SingleThreadEventExecutor.confirmShutdown()

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor 
    implements OrderedEventExecutor {

    protected boolean confirmShutdown() {
        // 不是正在关闭,返回false
        if (!isShuttingDown()) {
            return false;
        }

        if (!inEventLoop()) {
            throw new IllegalStateException("must be invoked from an event loop");
        }

        // 取消定时任务
        cancelScheduledTasks();

        // 设置优雅关闭服务的开始时间
        if (gracefulShutdownStartTime == 0) {
            gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
        }

        // 运行所有任务和所有shudown的钩子任务
        if (runAllTasks() || runShutdownHooks()) {
            // 当任务队列中存在任务
            // 并且所有任务执行完毕进入该分支
            if (isShutdown()) {
                // Executor shut down - no new tasks anymore.
                return true;
            }
            // 如果静默周期为0,返回true
            if (gracefulShutdownQuietPeriod == 0) {
                return true;
            }
            // 否则添加一个空任务,返回false
            wakeup(true);
            return false;
        }

        // 运行到此表明没有任何任务在运行
        final long nanoTime = ScheduledFutureTask.nanoTime();

        // 如果已经关闭,或者超时了,返回true
        if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
            return true;
        }

        // 如果当前时间减去上一次运行的时间在静默周期以内
        if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
            // 添加一个空任务,并休眠100ms
            wakeup(true);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // Ignore
            }

            return false;
        }
        // 超过了静默周期返回true
        return true;
    }
}

【2.5】Netty 关闭流程 -- 无任务运行/超过静默周期后的处理

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor 
    implements OrderedEventExecutor {

    private void doStartThread() {
        ...
        executor.execute(() -> {
            ...
            try {
                ...
            } catch (Throwable t) {
                ...
            } finally {
                // 修改状态为ST_SHUTDOWN,之后不能再添加任何任务
                for (; ; ) {
                    int oldState = state;
                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                        break;
                    }
                }

                if (success && gracefulShutdownStartTime == 0) {
                    logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                    "before run() implementation terminates.");
                }

                try {
                    // 再次执行confirmShutdown()直到没有任务或者超时
                    for (; ; ) {
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } finally {
                    try {
                        // 执行 cleanup() 方法
                        cleanup();
                    } finally {
                        STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                        // threadLock标识减一,将触发某些事件
                        threadLock.release();
                        if (!taskQueue.isEmpty()) {
                            logger.warn(
                                    "An event executor terminated with " +
                                            "non-empty task queue (" + taskQueue.size() + ')');
                        }
                        // NioEventLoop的terminationFuture已成功
                        terminationFuture.setSuccess(null);
                    }
                }
            }
        });
    }
}

 【2.5.1】protected void cleanup() 分析

public final class NioEventLoop extends SingleThreadEventLoop {

    // 关闭 selector
    @Override
    protected void cleanup() {
        try {
            selector.close();
        } catch (IOException e) {
            logger.warn("Failed to close a selector.", e);
        }
    }
}

【2.5.2】threadLock.release() 分析

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor 
    implements OrderedEventExecutor {

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        if (inEventLoop()) {
            throw new IllegalStateException("cannot await termination of the current thread");
        }

        // tryAcquire()的作用是尝试的获得1个许可,如果获取不到则返回false
        if (threadLock.tryAcquire(timeout, unit)) {
            // 释放一个许可
            threadLock.release();
        }

        return isTerminated();
    }
}
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        long deadline = System.nanoTime() + unit.toNanos(timeout);
        // 循环每一个NioEventLoop,等待它们终止
        loop: for (EventExecutor l: children) {
            for (;;) {
                long timeLeft = deadline - System.nanoTime();
                if (timeLeft <= 0) {
                    break loop;
                }
                if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
                    break;
                }
            }
        }
        return isTerminated();
    }
}

【2.6】NioEventLoopGroup/NioEventLoop 中 terminationFuture 之间的关联建立

NioEventLoopGroup 的 terminationFuture 是在其 shutdownGracefully() 方法中返回;NioEventLoop 的 terminationFuture 是在其 shutdownGracefully() 方法中返回;

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        ...
        // 创建一个监听器
        final FutureListener<Object> terminationListener = future -> {
            // 每个孩子完成时,terminatedChildren加一
            // 如果等于孩子数量,说明全部完成了
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        };

        // 给每个孩子都添加上这个监听器
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
    }
}

参考致谢

本博客为博主学习笔记,同时参考了网上众博主的博文以及相关专业书籍,在此表示感谢,本文若存在不足之处,请批评指正。

【1】慕课专栏,网络编程之Netty一站式精讲

【2】极客时间,Netty源码剖析与实战

最后

以上就是健康黑米为你收集整理的【JAVA 网络编程系列】Netty -- Netty 关闭流程【JAVA 网络编程系列】Netty -- Netty 关闭流程的全部内容,希望文章能够帮你解决【JAVA 网络编程系列】Netty -- Netty 关闭流程【JAVA 网络编程系列】Netty -- Netty 关闭流程所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(88)

评论列表共有 0 条评论

立即
投稿
返回
顶部