我是靠谱客的博主 成就高山,最近开发中收集的这篇文章主要介绍你说这是冷知识?Netty时间轮调度算法原理分析,蚂蚁金服面试Java后端经历,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

    }

    // 创建工作线程
    workerThread = threadFactory.newThread(worker);

    // 非守护线程且 leakDetection 为 true 时检测内存是否泄漏
    leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

    // 初始化最大等待任务数
    this.maxPendingTimeouts = maxPendingTimeouts;

    // 如果创建的时间轮实例大于 64,打印日志,并且这个日志只会打印一次
    if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
        WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
        reportTooManyInstances();
    }
}

构造函数中的参数相当重要,当自定义时间轮时,我们应该根据业务的范围设置合理的参数:

*   threadFactory:创建时间轮任务线程的工厂,通过这个工厂可以给我们的线程自定义一些属性(线程名、异常处理等)
*   tickDuration:时钟多长时间拨动一次,值越小,时间轮精度越高
*   unit:`tickDuration` 的单位
*   ticksPerWheel:时间轮数组大小
*   leakDetection:是否检测内存泄漏
*   maxPendingTimeouts:时间轮内最大等待的任务数

时间轮的时钟拨动时长应该根据业务设置恰当的值,如果设置的过大,可能导致任务触发时间不准确。如果设置的过小,时间轮转动频繁,任务少的情况下加载不到任务,属于一直空转的状态,会占用 CPU 线程资源。

为了防止时间轮占用过多的 CPU 资源,当创建的时间轮对象大于 64 时会以日志的方式提示。

构造函数中只是初始化了轮线程,并没有开启,当第一次往时间轮内添加任务时,线程才会开启。

#### 2.3 往时间轮内添加任务

@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (unit == null) {
        throw new NullPointerException("unit");
    }

    // 等待的任务数 +1
    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

    // 如果时间轮内等待的任务数大于最大值,任务会被抛弃
    if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
        pendingTimeouts.decrementAndGet();
        throw new RejectedExecutionException("Number of pending timeouts ("
            + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
            + "timeouts (" + maxPendingTimeouts + ")");
    }

    // 开启时间轮内的线程
    start();

    // 计算当前添加任务的执行时间
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

    // Guard against overflow.
    if (delay > 0 && deadline < 0) {
        deadline = Long.MAX_VALUE;
    }
    // 将任务加入队列
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    timeouts.add(timeout);
    return timeout;
}

任务会先保存在队列中,当时间轮的时钟拨动时才会判断是否将队列中的任务加载进时间轮。

public void start() {
    switch (WORKER_STATE_UPDATER.get(this)) {
        case WORKER_STATE_INIT:
            // 这里存在并发,通过 CAS 操作保证最终只有一个线程能开启时间轮的工作线程
            if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                workerThread.start();
            }
            break;
        case WORKER_STATE_STARTED:
            break;
        case WORKER_STATE_SHUTDOWN:
            throw new IllegalStateException("cannot be started once stopped");
        default:
            throw new Error("Invalid WorkerState");
    }

    while (startTime == 0) {
        try {
            // startTimeInitialized 是一个 CountDownLatch,目的是为了保证工作线程的 startTime 属性初始化
            startTimeInitialized.await();
        } catch (InterruptedException ignore) {
            // Ignore - it will be ready very soon.
        }
    }
}

这里通过 CAS 加锁的方式保证线程安全,避免多次开启。

工作线程开启后,`start()` 方法会被阻塞,等工作线程的 `startTime` 属性初始化完成后才被唤醒。为什么只有等 `startTime` 初始化后才能继续执行呢?因为上面的 `newTimeout` 方法在线程开启后,需要计算当前添加进来任务的执行时间,而这个执行时间是根据 `startTime` 计算的。

#### 2.4 时间轮调度

    @Override
    public void run() {
        // 初始化 startTime.
        startTime = System.nanoTime();
        if (startTime == 0) {
            startTime = 1;
        }

        // 用来唤醒被阻塞的 HashedWheelTimer#start() 方法,保证 startTime 初始化
        startTimeInitialized.countDown();

        do {
            // 时钟拨动
            final long deadline = waitForNextTick();
            if (deadline > 0) {
                int idx = (int) (tick & mask);
                // 处理过期的任务
                processCancelledTasks();
                HashedWheelBucket bucket =
                        wheel[idx];
                // 将任务加载进时间轮
                transferTimeoutsToBuckets();
                // 执行当前时间轮槽内的任务
                bucket.expireTimeouts(deadline);
                tick++;
            }
        } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

        // 时间轮关闭,将还未执行的任务以列表的形式保存到 unprocessedTimeouts 集合中,在 stop 方法中返回出去
        // 还未执行的任务可能会在两个地方,一:时间轮数组内,二:队列中
        for (HashedWheelBucket bucket: wheel) {
            bucket.clearTimeouts(unprocessedTimeouts);
        }
        for (;;) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                break;
            }
            if (!timeout.isCancelled()) {
                unprocessedTimeouts.add(timeout);
            }
        }
        // 处理过期的任务
        processCancelledTasks();
    }

时间轮每拨动一次 `tick` 就会 +1,根据这个值与(时间轮数组长度 - 1)进行 `&` 运算,可以定位时间轮数组内的槽。因为 `tick` 值一直在增加,所以时间轮数组看起来就像一个不断循环的圆。

*   先初始化 `startTime` 值,因为后面任务执行的时间是根据 `startTime` 计算的
*   时钟拨动,如果时间未到,则 `sleep` 一会儿
*   处理过期的任务
*   将任务加载进时间轮
*   执行当前时钟对应时间轮内的任务
*   时间轮关闭,将所有未执行的任务封装到 `unprocessedTimeouts` 集合中,在 `stop` 方法中返回出去
*   处理过期的任务

上面简单罗列了下 `run` 方法的大概执行步骤,下面是具体方法的分析。

#### 2.5 时钟拨动

如果时间轮设置的 `tickDuration` 为 100ms 拨动一次,当时钟拨动一次后,应该计算下一次时钟拨动的时间,如果还没到就 `sleep` 一会儿,等到拨动时间再醒来。

    private long waitForNextTick() {
        // 计算时钟下次拨动的相对时间
        long deadline = tickDuration * (tick + 1);

        for (;;) {
            // 获取当前时间的相对时间
            final long currentTime = System.nanoTime() - startTime;
            // 计算距离时钟下次拨动的时间
            // 这里之所以加 999999 后再除 10000000, 是为了保证足够的 sleep 时间
            // 例如:当 deadline - currentTime = 2000002 的时候,如果不加 999999,则只睡了 2ms
            // 而 2ms 其实是未到达 deadline 时间点的,所以为了使上述情况能 sleep 足够的时间,加上 999999 后,会多睡 1ms
            long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

            // <=0 说明可以拨动时钟了
            if (sleepTimeMs <= 0) {
                if (currentTime == Long.MIN_VALUE) {
                    return -Long.MAX_VALUE;
                } else {
                    return currentTime;
                }
            }

            // 这里是为了兼容 Windows 平台,因为 Windows 平台的调度最小单位为 10ms,如果不是 10ms 的倍数,可能会引起 sleep 时间不准确
            // See https://github.com/Netty/Netty/issues/356
            if (PlatformDependent.isWindows()) {
                sleepTimeMs = sleepTimeMs / 10 * 10;
            }

            try {
                // sleep 到下次时钟拨动
                Thread.sleep(sleepTimeMs);
            } catch (InterruptedException ignored) {
                if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                    return Long.MIN_VALUE;
                }
            }
        }
    }

如果时间不到就 `sleep` 等待一会儿,为了使任务时钟准确,可以从上面的代码中看出 Netty 做了一些优化,比如 `sleepTimeMs` 的计算,Windows 平台的处理等。

#### 2.6 将任务从队列加载进时间轮

    private void transferTimeoutsToBuckets() {

        // 一次最多只处理队列中的 100000 个任务
        for (int i = 0; i < 100000; i++) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                // all processed
                break;
            }
            // 过滤已经取消的任务
            if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                continue;
            }
            // 计算当前任务到执行还需要经过几次时钟拨动
            // 假设时间轮数组大小是 10,calculated 为 12,需要时间轮转动一圈加两次时钟拨动后后才能执行这个任务,因此还需要计算一下圈数
            long calculated = timeout.deadline / tickDuration;
            // 计算当前任务到执行还需要经过几圈时钟拨动
            timeout.remainingRounds = (calculated - tick) / wheel.length;
            // 有的任务可能在队列里很长时间,时间过期了也没有被调度,将这种情况的任务放在当前轮次内执行
            final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
            // 计算任务在时间轮数组中的槽
            int stopIndex = (int) (ticks & mask);
            HashedWheelBucket bucket = wheel[stopIndex];
            // 将任务放到时间轮的数组中,多个任务可能定位时间轮的同一个槽,这些任务通过以链表的形式链接
            bucket.addTimeout(timeout);
        }
    }

    void addTimeout(HashedWheelTimeout timeout) {
        assert timeout.bucket == null;
        // 任务构成双向链表
        timeout.bucket = this;
        if (head == null) {
            head = tail = timeout;
        } else {
            tail.next = timeout;
            timeout.prev = tail;
            tail = timeout;
        }
    }        

在上面也提到过,任务刚加进来不会立即到时间轮中去,而是暂时保存到一个队列中,当时间轮时钟拨动时,会将任务从队列中加载进时间轮内。

时间轮每次最大处理 100000 个任务,因为任务的执行时间是用户自定义的,所以需要计算任务到执行需要经过多少次时钟拨动,并计算时间轮拨动的圈数。接着将任务加载进时间轮对应的槽内,可能有多个任务经过 hash 计算后定位到同一个槽,这些任务会以双向链表的结构保存,有点类似 `HashMap` 处理碰撞的情况。

#### 2.7 执行任务

    public void expireTimeouts(long deadline) {
        HashedWheelTimeout timeout = head;

        while (timeout != null) {
            HashedWheelTimeout next = timeout.next;
            // 任务执行的圈数 > 0,表示任务还需要经过 remainingRounds 圈时钟循环才能执行
            if (timeout.remainingRounds <= 0) {
                // 从链表中移除当前任务,并返回链表中下一个任务
                next = remove(timeout);
                if (timeout.deadline <= deadline) {
                    // 执行任务
                    timeout.expire();
                } else {
                    // The timeout was placed into a wrong slot. This should never happen.
                    throw new IllegalStateException(String.format(
                            "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                }
            } else if (timeout.isCancelled()) {
                // 过滤取消的任务
                next = remove(timeout);
            } else {
                // 圈数 -1
                timeout.remainingRounds --;
            }
            timeout = next;
        }
    }

    public void expire() {
        // 任务状态校验
        if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
            return;
        }

        try {
            task.run(this);
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
            }
        }
    }

时间轮槽内的任务以链表形式存储,这些任务执行的时间可能会不一样,有的在当前时钟执行,有的在下一圈或者下两圈对应的时钟执行。当任务在当前时钟执行时,需要将这个任务从链表中删除,重新维护链表关系。

#### 2.8 终止时间轮

@Override
public Set<Timeout> stop() {
    // 终止时间轮的线程不能是时间轮的工作线程
    if (Thread.currentThread() == workerThread) {
        throw new IllegalStateException(
                HashedWheelTimer.class.getSimpleName() +
                        ".stop() cannot be called from " +
                        TimerTask.class.getSimpleName());

总结

在清楚了各个大厂的面试重点之后,就能很好的提高你刷题以及面试准备的效率,接下来小编也为大家准备了最新的互联网大厂资料。

CodeChina开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频】

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

TimerTask.class.getSimpleName());

总结

在清楚了各个大厂的面试重点之后,就能很好的提高你刷题以及面试准备的效率,接下来小编也为大家准备了最新的互联网大厂资料。

CodeChina开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频】

[外链图片转存中…(img-W0331Usu-1631106428715)]

[外链图片转存中…(img-3zA42SmH-1631106428717)]

[外链图片转存中…(img-yfzxQaTH-1631106428718)]

在这里插入图片描述

最后

以上就是成就高山为你收集整理的你说这是冷知识?Netty时间轮调度算法原理分析,蚂蚁金服面试Java后端经历的全部内容,希望文章能够帮你解决你说这是冷知识?Netty时间轮调度算法原理分析,蚂蚁金服面试Java后端经历所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(54)

评论列表共有 0 条评论

立即
投稿
返回
顶部