我是靠谱客的博主 精明热狗,最近开发中收集的这篇文章主要介绍Java线程池原理解析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

线程的消耗

1、其实 Java 中的线程模型是基于操作系统原生线程模型实现的,也就是说 Java 中的线程其实是基于内核线程实现的,线程的创建,析构与同步都需要进行系统调用,而系统调用需要在用户态与内核中来回切换,代价相对较高,线程的生命周期耗时包括「线程创建时间」,「线程执行任务时间」,「线程销毁时间」,创建和销毁都需要导致系统调用;

2、每个 Thread 都需要有一个内核线程的支持,也就意味着每个 Thread 都需要消耗一定的内核资源(如内核线程的栈空间),因此能创建的 Thread 是有限的;

3、线程多了,导致不可忽视的上下文切换开销。

由此可见线程的创建消耗代价是昂贵的,所以必须以线程池的形式来管理这些线程,在线程池中合理设置线程大小和管理线程,以达到以合理的创建线程大小以达到最大化收益,最小化风险的目的,对于开发人员来说,要完成任务不用关心线程如何创建,如何销毁,如何协作,只需要关心提交的任务何时完成即可,对线程的调优,监控等这些细枝末节的工作通通交给线程池来实现。

线程池类关系

先来看一张线程池的继承类关系图:
在这里插入图片描述

  • 最顶层的是Executor接口,源码里只有execute一个声明方法;
  • Executor接口的子类是ExecutorService接口,源码中声明了shutdownshutdownNowisShutdownsubmit等方法;
  • Executor接口有两个子类,一个是AbstractExecutorService抽象类,一个是ScheduledExecutorService接口;
  • AbstractExecutorService抽象类是ThreadPoolExecutor类和ForkJoinPool类的父类;
  • ScheduledExecutorService接口的实现类是ScheduledThreadPoolExecutorScheduledThreadPoolExecutor类又是ThreadPoolExecutor类的子类;
  • 最后就是上一篇中所说的6种线程池的创建方式和ThreadPoolExecutor创建方式。

线程池工作原理

线程池运行机制主要由工作线程、工作队列、拒绝策略组成,其工作原理如下所示:

  1. 如果此时线程池中的线程数量小于corePoolSize,无论线程池中的线程是否处于空闲状态,也会创建新的线程来处理被添加的任务;

  2. 如果此时线程池中的线程数量等于corePoolSize,但是阻塞队列workQueue未满,那么任务被放入阻塞队列;

  3. 如果此时线程池中的线程数量大于等于corePoolSize,阻塞队列workQueue已满,并且线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务;

  4. 如果此时线程池中的线程数量大于corePoolSize,阻塞队列workQueue已满,并且线程池中的线程数量等于maximumPoolSize,那么通过 handler所指定的拒绝策略来处理此任务;

  5. 当线程池中的线程数量比corePoolSize数量要多时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。

从线程池工作原理中可以得知:

  1. 提交任务优先级为核心线程corePoolSize > 任务队列workQueue> 最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务;

  2. workQueue使用的是无界限队列时(比如new LinkedBlockingQueue(),或者new ArrayBlockingQueue(Integer.MAX_VALUE);),任务可以一直向队列中添加,maximumPoolSize参数就变的无意义了。比如newFixedThreadPoolnewSingleThreadExecutor这两种线程池的工作队列是LinkedBlockingQueue,而maximumPoolSizecorePoolSize相等。

  3. 使用SynchronousQueue队列时由于该队列没有容量的特性(特点:内部没有容器,一个生产线程,当它生产产品(即put的时候),如果当前没有人想要消费产品(即当前没有线程执行take),此生产线程必须阻塞,等待一个消费线程调用take操作,take操作将会唤醒该生产线程,同时消费线程会获取生产线程的产品,队列操作时offer为非阻塞,take为阻塞,put也为阻塞操作),所以不会对任务进行排队,如果线程池中没有空闲线程,会立即创建一个新线程来接收这个任务,故而maximumPoolSize要设置大一点;

    比如:newCachedThreadPool中使用的是SynchronousQueue,任务到来,有空闲线程则使用空闲线程,无空闲线程则创建。所以称之为缓存线程,而它的maximumPoolSize设置的是Integer.MAX_VALUE

  4. 核心线程和最大线程数量相等时keepAliveTime无作用,因为只有当前运行线程数大于corePoolSize时,才会判断当有空闲线程时并且到了keepAliveTime的时间,终止线程。

  5. 线程池能处理的任务数 = corePoolSize + 阻塞队列容量 + 创建的工作线程(maximumPoolSize - corePoolSize ),比如corePoolSize 为2,阻塞队列为5,maximumPoolSize 为10,当前要创建的线程为16,则corePoolSize 先运行2个核心线程,剩下的放入阻塞队列5个,然后再创建(10-2)8个工作线程,总共运行线程是 2 + 5+ 8 = 15,剩下最后一个线程则由拒绝策略处理。

线程池源码解析

《线程池的使用方式》一文中了解了线程池的创建方式,这里开始分析线程池的执行方法execute

public void execute(Runnable command) {
    ......
    // 1.获取线程运行状态或数量
    int c = ctl.get();
    // 2.如果线程池线程数量小于核心线程数,则创建新线程
     if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
    // 3、如果线程数量大于等于corePoolSize,并且
    // 如果线程池是运行状态,则将线程添加到工作队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 4、再次检测线程是否是运行状态,因为执行入队操作后,线程有可能不是运行状态了
        // 如果线程池不在运行状态,则移除线程,并执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 5、如果线程是运行状态,线程数量是0,则开启新线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 6、如果添加worker失败,则执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

1、获取线程运行状态或数量;

2、判断工作线程数量如果小于核心线程,则开启核心线程,第二个参数传true表示是不是核心线程;

private boolean addWorker(Runnable firstTask, boolean core) 

3、如果线程数量大于等于corePoolSize,并且如果线程池是运行状态,则将线程添加到工作队列;这里有必须了解一下线程池的几种状态:

线程池状态管理

通过下面源码得知,线程池使用AtomicInteger类型(AtomicInteger是一个提供原子操作的Integer类,通过线程安全的方式操作加减,十分适合高并发情况下的使用)的ctl变量记录线程池的状态和线程池中的线程数量,初始化的时候传入ctlOf(RUNNING, 0)的执行结果,通过函数可以发现返回RUNNING的值,即111 00000000000000000000000000000,这里有32位,低 29 位表示线程数量, 29 位最大可以表示 (2^29)-1 (536870911)个线程数,,高 3 位用来表示线程池的状态,3 位可以表示 8 个线程池的状态,而线程池总共只有五个状态。

public class ThreadPoolExecutor extends AbstractExecutorService {
    // ctl用了记录线程池的状态和线程池中的线程数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 32 - 3 = 29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 表示29左移1位,再减1,容量是1 1111 1111 1111 1111 1111 1111 1111,十进制是 2^29-1 = 536870911
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // RUNNING,-1左移29位 111 00000000000000000000000000000  29个0
    private static final int RUNNING    = -1 << COUNT_BITS;
    // SHUTDOWN,0左移29位,000 00000000000000000000000000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // STOP,1左移29位 001 00000000000000000000000000000
    private static final int STOP       =  1 << COUNT_BITS;
    // TIDYING, 2左移29位 010 00000000000000000000000000000
    private static final int TIDYING    =  2 << COUNT_BITS;
    // TERMINATED,3左移29位 011 00000000000000000000000000000
    private static final int TERMINATED =  3 << COUNT_BITS;
    // 线程状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 线程数量
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 初始化ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }

移位运算符

<< : 左移运算符,num << n, 相当于num左移n位;

>> : 右移运算符,num >> n, 相当于num右移n位;

可以测试一下输出:

// 测试
int COUNT_BITS = Integer.SIZE - 3; // 32 - 3 = 29
Log.e(TAG, "COUNT_BITS: " + COUNT_BITS);
Log.e(TAG, "COUNT_BITS toBinaryString: " + Integer.toBinaryString(COUNT_BITS));
int CAPACITY = (1 << COUNT_BITS) - 1; // 1左移29位再减1,11111111111111111111111111111
Log.e(TAG, "CAPACITY: " + CAPACITY);
Log.e(TAG, "CAPACITY: " + Integer.toBinaryString(CAPACITY));
int RUNNING = -1 << COUNT_BITS; // -1左移29位 111 0 0000 0000 0000 0000 0000 0000 0000
Log.e(TAG, "RUNNING: " + Integer.toBinaryString(RUNNING));
int SHUTDOWN = 0 << COUNT_BITS; // 0 左移29位 000 0 0000 0000 0000 0000 0000 0000 0000
Log.e(TAG, "SHUTDOWN: " + Integer.toBinaryString(SHUTDOWN));
int STOP = 1 << COUNT_BITS; // 1左移29位 001 0 0000 0000 0000 0000 0000 0000 0000
Log.e(TAG, "STOP: " + Integer.toBinaryString(STOP));
int TIDYING = 2 << COUNT_BITS; // 2左移29位 010 0 0000 0000 0000 0000 0000 0000 0000
Log.e(TAG, "TIDYING: " + Integer.toBinaryString(TIDYING));
int TERMINATED = 3 << COUNT_BITS; // 3左移29位 011 0 0000 0000 0000 0000 0000 0000 0000
Log.e(TAG, "TERMINATED: " + Integer.toBinaryString(TERMINATED));

线程池状态

  • RUNNING:接受新任务并且处理阻塞队列里的任务;
  • SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务;
  • STOP:拒绝新任务并且抛弃阻塞队列里的任务,同时会中断正在处理的任务;
  • TIDYING:所有任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为 0,将要调用 terminated 方法;
  • TERMINATED:终止状态,terminated方法调用完成以后的状态。

线程池状态转换

​ 1.RUNNING -> SHUTDOWN:显式调用 shutdown() 方法,或者隐式调用了 finalize(),它里面调用了 shutdown() 方法。

​ 2.RUNNING or SHUTDOWN -> STOP:显式调用 shutdownNow() 方法时候。

​ 3.SHUTDOWN -> TIDYING:当线程池和任务队列都为空的时候。

​ 4.STOP -> TIDYING:当线程池为空的时候。

​ 5.TIDYING -> TERMINATED:当 terminated() 方法执行完成时候。

4、再次检测线程是否是运行状态,因为执行入队操作后,线程有可能不是运行状态了,如果线程池不在运行状态,则移除线程,并执行拒绝策略;

5、如果线程是运行状态,线程数量是0,则开启新线程;

6、如果添加worker失败,则执行拒绝策略,拒绝策略实际调用了RejectedExecutionHandlerrejectedExecution方法:

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

其实execute方法中最主要的是addWorker方法,下面分析一下Worker的处理流程:

线程池中的Worker

线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker。Worker首先继承了AbstractQueuedSynchronizer父类(Java并发编程核心在于 java.concurrent.util 包而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于 AbstractQueuedSynchronizer 简称AQS,AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器。子类们必须定义改变state变量的protected方法,这些方法定义了state是如何被获取或释放的),并实现了Runnable接口,实现了Runnable接口。

// 此处可以看出 worker 既是一个 Runnable 任务,也实现了 AQS(实际上是用 AQS 实现了一个独占锁,这样由于 worker 运行时会上锁,执行 shutdown,setCorePoolSize,setMaximumPoolSize等方法时会试着中断线程(interruptIdleWorkers) ,在这个方法中断方法中会先尝试获取 worker 的锁,如果不成功,说明 worker 在运行中,此时会先让 worker 执行完任务再关闭 worker 的线程,实现优雅关闭线程的目的)
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;

        // 实际执行任务的线程
        final Thread thread;
        // 如果当前线程数少于核心线程数,创建线程并将提交的任务交给 worker处理,此时 firstTask 即为此提交的任务,如果 worker 从 workQueue 中获取任务,则 firstTask 为空
        Runnable firstTask;
        // 统计完成的任务数
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            // 初始化为 -1,这样在线程运行前(调用runWorker)禁止中断,在 interruptIfStarted() 方法中会判断 getState()>=0
            setState(-1); 
            this.firstTask = firstTask;

            // 根据线程池的 threadFactory 创建一个线程,将 worker 本身传给线程(因为 worker 实现了 Runnable 接口)
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            // thread 启动后会调用此方法
            runWorker(this);
        }

       
        // 1 代表被锁住了,0 代表未锁
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        // 尝试获取锁
        protected boolean tryAcquire(int unused) {
            // 从这里可以看出它是一个独占锁,因为当获取锁后,cas 设置 state 不可能成功,这里我们也能明白上文中将 state 设置为 -1 的作用,这种情况下永远不可能获取得锁,而 worker 要被中断首先必须获取锁
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // 尝试释放锁
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }    

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
            
        // 中断线程,这个方法会被 shutdowNow 调用,从中可以看出 shutdownNow 要中断线程不需要获取锁,也就是说如果线程正在运行,照样会给你中断掉,所以一般来说我们不用 shutdowNow 来中断线程,太粗暴了,中断时线程很可能在执行任务,影响任务执行。
        void interruptIfStarted() {
            Thread t;
            // 中断也是有条件的,必须是 state >= 0 且 t != null 且线程未被中断
            // 如果 state == -1 ,不执行中断,再次明白了为啥上文中 setState(-1) 的意义
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

总结:通过将线程封装为Worker,可以更好的管理线程和通过锁来中断线程。

下面看addWorker的工作流程:

private boolean addWorker(Runnable firstTask, boolean core) {
	// 1、retry标记
    retry:
    for (;;) {
        int c = ctl.get();

        // 获取线程池的状态
        int rs = runStateOf(c);

        // 如果线程池的状态 >= SHUTDOWN,即为 SHUTDOWN,STOP,TIDYING,TERMINATED 这四个状态,只有一种情况有可能创建线程,即线程状态为 SHUTDOWN, 且队列非空时,firstTask == null 代表创建一个不接收新任务的线程(此线程会从 workQueue 中获取任务再执行),这种情况下创建线程是为了加速处理完 workQueue 中的任务
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 获取线程数
            int wc = workerCountOf(c);
            // 如果超过了线程池的最大 CAPACITY(5 亿多,基本不可能)
            // 或者 超过了 corePoolSize(core 为 true) 或者 maximumPoolSize(core 为 false) 时
            // 则返回 false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 2、校验通过后,增加线程的数量,如果成功跳出双重循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl

            // 如果线程运行状态发生变化,跳到外层循环继续执行
            if (runStateOf(c) != rs)
                continue retry;
            // 说明是因为 CAS 增加线程数量失败所致,继续执行 retry 的内层循环
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 3、创建Worker,然后把线程任务添加进去
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 4、添加锁,是因为下文要把 w 添加进 workers 中, workers 是 HashSet,不是线程安全的,所以需要加锁予以保证
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //  再次 check 线程池的状态以防执行到此步时发生中断等
                int rs = runStateOf(ctl.get());
                // 如果线程池状态小于 SHUTDOWN(即为 RUNNING),
                // 或者状态为 SHUTDOWN 但 firstTask == null(代表不接收任务,只是创建线程处理 workQueue 中的任务),则满足添加 worker 的条件
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                  // 如果线程已启动,抛出异常,因为线程还没start
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    // 将Worker添加到HashSet中
                    workers.add(w);
                    int s = workers.size();

                    // 记录最大的线程池大小以作监控之用
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 设置添加状态
                    workerAdded = true;
                }
            } finally {
                // 解除独占锁
                mainLock.unlock();
            }

            //5、往 workers 中添加 worker 成功后,则启动线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 6、添加线程失败,执行 addWorkerFailed 方法
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker方法的逻辑中需要关注6点:

  1. retry就是一个标记,标记对一个循环方法的操作(continue和break)处理点,功能类似于goto,所以retry一般都是伴随着for循环出现,retry:标记的下一行就是for循环,在for循环里面调用continue(或者break)再紧接着retry标记时,就表示从这个地方开始执行continue(或者break)操作,
    在内层循环里面调用continue(或者break)后接着retry标识符,程序直接转到最外层for循环去处理了;

  2. 前面判断校验通过后,通过 compareAndIncrementWorkerCount函数增加线程的数量,如果成功跳出双重循环;

  3. 开始创建Worker,然后把线程任务添加进去,由之前的Worker构造源码可以知道,this.thread = getThreadFactory().newThread(this);这里的创建的thread是将Worker自己传进去的。

    Worker(Runnable firstTask) {
        // 初始化为 -1,这样在线程运行前(调用runWorker)禁止中断,在 interruptIfStarted() 方法中会判断 getState()>=0
        setState(-1); 
        this.firstTask = firstTask;
    
        // 根据线程池的 threadFactory 创建一个线程,将 worker 本身传给线程(因为 worker 实现了 Runnable 接口)
        this.thread = getThreadFactory().newThread(this);
    }
    
  4. 添加锁,是因为下文要把 Worker添加进 workers 中, workers 是 HashSet,不是线程安全的,所以需要加锁予以保证,最后会在finally中解除锁;

    这里简单了解一下ReentrantLock

    java除了使用关键字synchronized外,还可以使用ReentrantLock实现独占锁的功能。而且ReentrantLock相比synchronized而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景。

    ReentrantLock常常对比着synchronized来分析:

    (1)synchronized是独占锁,加锁和解锁的过程自动进行,易于操作,但不够灵活。ReentrantLock也是独占锁,加锁和解锁的过程需要手动进行,不易操作,但非常灵活。

    (2)synchronized可重入,因为加锁和解锁自动进行,不必担心最后是否释放锁;ReentrantLock也可重入,但加锁和解锁需要手动进行,且次数需一样,否则其他线程无法获得锁。

    (3)synchronized不可响应中断,一个线程获取不到锁就一直等着;ReentrantLock可以相应中断。

    ReentrantLock好像比synchronized关键字没好太多,我们再去看看synchronized所没有的,一个最主要的就是ReentrantLock还可以实现公平锁机制。什么叫公平锁呢?也就是在锁上等待时间最长的线程将获得锁的使用权。通俗的理解就是谁排队时间最长谁先执行获取锁。

  5. 往 workers 中添加 worker 成功后,则启动线程,由第3步知道,t就是Worker自己,所以调用t.start();的时候会执行Runnable中的run方法,也就是Worker的run方法,run方法中又调用了runWorker方法,下面会分析这个方法;

    public void run() {
        // thread 启动后会调用此方法
        runWorker(this);
    }
    
  6. 添加线程失败,执行 addWorkerFailed 方法

    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
    

    这个方法,再次使用锁来锁住workers,然后移除掉Worker,并且减少Worker线程数量和尝试停止线程池。

runWorker方法

从上文得知,最后调用了Worker的run方法,run方法又调用了runWorker方法,下面看一下该方法:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 释放锁,unlock会调用release(1)方法,release方法调用tryRelease方法,tryRelease 方法将 state设置成0,interruptIfStarted中允许中断的条件是state>=0
    w.unlock();
    boolean completedAbruptly = true;
    try {
        // 首先判断task != null表示如果在提交任务时创建了线程,并把任务丢给此线程,则会先执行此 task
        // 然后再循环调用getTask方法,从队列中获取task
        while (task != null || (task = getTask()) != null) {
            // 如果task存在,则上锁
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 执行任务前,子类可实现此钩子方法作为统计之用
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行task的run方法
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                     // 执行任务后,子类可实现此钩子方法作为统计之用
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 完成任务数+1
                w.completedTasks++;
                // 释放锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 当线程执行过程中异常了或者队列中task执行完毕后,开始执行该方法
        processWorkerExit(w, completedAbruptly);
    }
}

先看一下getTask方法

getTask方法

private Runnable getTask() {
    // 设置出队超时标志
    boolean timedOut = false; // Did the last poll() time out?
	// 使用for循环来执行
    for (;;) {
        // 获取线程池中线程数量
        int c = ctl.get();
        //获取线程池运行状态
        int rs = runStateOf(c);

        // 检测线程池状态是否停止或者工作队列是否为空,如果是则减少Worker计数,返回null
        // 此时会执行processWorkerExit方法,从而让获取此 Task 的 woker 退出
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
	
        int wc = workerCountOf(c);
	   // timed临时变量用于线程超时控制,决定是否需要通过poll()此带超时的非阻塞方法进行任务队列的任务拉取
       // 1.allowCoreThreadTimeOut默认值为false,如果设置为true,则允许核心线程也能通过poll()方法从任务队列中拉取任务
       // 2.工作线程数大于核心线程数的时候,说明线程池中创建了额外的非核心线程,这些非核心线程一定是通过poll()方法从任务队列中拉取任务
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // 1.wc > maximumPoolSize说明当前的工作线程总数大于maximumPoolSize,说明了通过setMaximumPoolSize()方法减少了线程池容量
        // 或者 2.timed && timedOut说明了线程命中了超时控制并且上一轮循环通过poll()方法从任务队列中拉取任务为null
        // 并且 3. 工作线程总数大于1或者任务队列为空,则通过CAS把线程数减去1,同时返回null,      
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            // CAS把线程数减去1失败会进入下一轮循环做重试
            continue;
        }

        try {
            // 如果timed为true,通过poll()方法做超时拉取,keepAliveTime时间内没有等待到有效的任务,则返回null
            // 如果timed为false,通过take()做阻塞拉取,会阻塞到有下一个有效的任务时候再返回(一般不会是null)
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            // 非null时候才返回,null的情况下会进入下一轮循环
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

这段逻辑大多数情况下是针对非核心线程。在execute()方法中,当线程池总数已经超过了corePoolSize并且还小于maximumPoolSize时,当任务队列已经满了的时候,会通过addWorker(task,false)添加非核心线程。而这里的逻辑恰好类似于addWorker(task,false)的反向操作,用于减少非核心线程,使得工作线程总数趋向于corePoolSize。如果对于非核心线程,上一轮循环获取任务对象为null,这一轮循环很容易满足timed && timedOut为true,这个时候getTask()返回null会导致Worker#runWorker()方法跳出死循环,之后执行processWorkerExit()方法处理后续工作,而该非核心线程对应的Worker则变成“游离对象”,等待被JVM回收。当allowCoreThreadTimeOut设置为true的时候,这里分析的非核心线程的生命周期终结逻辑同时会适用于核心线程。那么可以总结出keepAliveTime的意义:

  • 当允许核心线程超时,也就是allowCoreThreadTimeOut设置为true的时候,此时keepAliveTime表示空闲的工作线程的存活周期。
  • 默认情况下不允许核心线程超时,此时keepAliveTime表示空闲的非核心线程的存活周期。

在一些特定的场景下,配置合理的keepAliveTime能够更好地利用线程池的工作线程资源。

再看一下processWorkerExit方法。

processWorkerExit方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果线程退出,则执行线程数减一
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        // 移除worker,回收线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
	// woker 既然异常退出,可能线程池状态变了(如执行 shutdown 等),尝试着关闭线程池
    tryTerminate();

    int c = ctl.get();
    // 如果线程池处于STOP状态,则如果 woker 是异常退出的,重新新增一个 woker,如果是正常退出的,在 wokerQueue 为非空的条件下,确保至少有一个线程在运行以执行 wokerQueue 中的任务
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 新增一个 woker
        addWorker(null, false);
    }
}

processWorkerExit()方法是为将要终结的Worker做一次清理和数据记录工作(因为processWorkerExit()方法也包裹在runWorker()方法finally代码块中,其实工作线程在执行完processWorkerExit()方法后生命周期才算真正的终结)。

tryTerminate方法

每个线程的终结都会调用 tryTerminate()方法:

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // 判断线程池的状态,如果是下面三种情况下的任意一种则直接返回:
        // 1.线程池处于RUNNING状态
        // 2.线程池至少为TIDYING状态,也就是TIDYING或者TERMINATED状态,意味着已经走到了下面的步骤,线程池即将终结
        // 3.线程池至少为STOP状态并且任务队列不为空
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
            return;
        // 工作线程数不为0,则中断工作线程集合中的第一个空闲的工作线程
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // CAS设置线程池状态为TIDYING,如果设置成功则执行钩子方法terminated()
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    // 最后更新线程池状态为TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    // 唤醒阻塞在termination条件的所有线程,这个变量的await()方法在awaitTermination()中调用
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

// 中断空闲的工作线程,onlyOne为true的时候,只会中断工作线程集合中的某一个线程
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // 这里判断线程不是中断状态并且尝试获取锁成功的时候才进行线程中断
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            // 这里跳出循环,也就是只中断集合中第一个工作线程
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

这里有疑惑的地方是tryTerminate()方法的第二个if代码逻辑:工作线程数不为0,则中断工作线程集合中的第一个空闲的工作线程。方法API注释中有这样一段话:

If otherwise eligible to terminate but workerCount is nonzero, interrupts an idle worker to ensure that shutdown signals propagate.
当满足终结线程池的条件但是工作线程数不为0,这个时候需要中断一个空闲的工作线程去确保线程池关闭的信号得以传播。

下面将会分析的shutdown()方法中会通过interruptIdleWorkers()中断所有的空闲线程,这个时候有可能有非空闲的线程在执行某个任务,执行任务完毕之后,如果它刚好是核心线程,就会在下一轮循环阻塞在任务队列的take()方法,如果不做额外的干预,它甚至会在线程池关闭之后永久阻塞在任务队列的take()方法中。为了避免这种情况,每个工作线程退出的时候都会尝试中断工作线程集合中的某一个空闲的线程,确保所有空闲的线程都能够正常退出。

interruptIdleWorkers()方法中会对每一个工作线程先进行tryLock()判断,只有返回true才有可能进行线程中断。我们知道runWorker()方法中,工作线程在每次从任务队列中获取到非null的任务之后,会先进行加锁Worker#lock()操作,这样就能避免线程在执行任务的过程中被中断,保证被中断的一定是空闲的工作线程。

shutdown方法源码分析

线程池关闭操作有几个相关的变体方法,先看shutdown()

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 权限校验,安全策略相关判断
        checkShutdownAccess();
        // 设置SHUTDOWN状态
        advanceRunState(SHUTDOWN);
        // 中断所有的空闲的工作线程
        interruptIdleWorkers();
        // 钩子方法
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 调用上面分析的tryTerminate方法,使状态更变为TIDYING,执行钩子方法terminated()后,最终状态更新为TERMINATED
    tryTerminate();
}

// 设置状态
private void advanceRunState(int targetState) {
    // assert targetState == SHUTDOWN || targetState == STOP;
    for (;;) {
        int c = ctl.get();
        // 线程池状态至少为targetState或者CAS设置状态为targetState则跳出循环
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

// 中断所有的空闲的工作线程
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

// 中断所有空闲线程
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // 跟shutdownNow方法调用interruptWorkers⽅法不同的是,interruptIdleWorkers⽅法在遍历线程池里的线程时,有一个w.tryLock()加锁判断,只有加锁成功的线程才会被调用interrupt方法。这个锁就是runWork方法中的对线程加的锁
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

接着看shutdownNow()方法:

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 权限校验,安全策略相关判断
        checkShutdownAccess();
        // 设置STOP状态
        advanceRunState(STOP);
        // 中断所有的工作线程
        interruptWorkers();
        // 清空工作队列并且取出所有的未执行的任务
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    // 调用上面分析的tryTerminate方法,使状态更变为TIDYING,执行钩子方法terminated()后,最终状态更新为TERMINATED
    tryTerminate();
    return tasks;
}

// 遍历所有的工作线程,如果state > 0(启动状态)则进行中断
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 遍历集合,调用worker的中断方法
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}

shutdownNow()方法会把线程池状态先更变为STOP,中断所有的工作线程(AbstractQueuedSynchronizerstate值大于0的Worker实例,也就是包括正在执行任务的Worker和空闲的Worker),然后遍历任务队列,取出(移除)所有任务存放在一个列表中返回。

再看awaitTermination()方法:

public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    // 转换timeout的单位为纳秒
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 循环等待直到线程池状态更变为TERMINATED,每轮循环等待nanos纳秒
        while (runStateLessThan(ctl.get(), TERMINATED)) {
            if (nanos <= 0L)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
        return true;
    } finally {
        mainLock.unlock();
    }
}

awaitTermination()虽然不是shutdown()方法体系,但是它的处理逻辑就是确保调用此方法的线程会阻塞到tryTerminate()方法成功把线程池状态更新为TERMINATED后再返回,可以使用在某些需要感知线程池终结时刻的场景。

有一点值得关注的是:shutdown()方法只会中断空闲的工作线程,如果工作线程正在执行任务对象Runnable#run(),这种情况下的工作线程不会中断,而是等待下一轮执行getTask()方法的时候通过线程池状态判断正常终结该工作线程。

关闭线程池总结

1、对比一下shutdownNow()方法和shutdown()方法

当调用线程池的shutdownNow时,如果线程正在getTask方法中执⾏,则会通过for循环进入到if语句,于是getTask 返回null,从而线程退出,不管线程池里是否有未完成的任务;

如果线程因为执行提交到线程池里的任务而处于(锁住)阻塞状态,则会导致报错。(如果任务里没有捕获InterruptedException异常),否则线程会执行完当前任务,然后通过getTask方法返回为null来退出;

try {
    Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();
    if (r != null)
        return r;
    timedOut = true;
  // getTask方法会报异常,这里会执行下一个循环
} catch (InterruptedException retry) {
    timedOut = false;
}

当调用线程池的``shutdown时,跟shutdownNow方法调用interruptWorkers⽅法不同的是,interruptIdleWorkers⽅法在遍历线程池里的线程时,有一个w.tryLock()加锁判断,只有加锁成功的线程才会被调用interrupt`方法,也就是runWorker方法里对任务执行时加的锁:

   final void runWorker(Worker w) {
        try {
            while (task != null || (task = getTask()) != null) {
                // 加锁
                w.lock()
                try {
                    // 执行run方法
                   task.run();
                } finally {
              		......
                    // 解锁
                    w.unlock();
                }
       		......
    }

2、优雅的关闭线程池

  • 使用shutdownNow⽅法,可能会引起报错,使用shutdown方法可能会导致线程关闭不了;
  • 当使用shutdownNow⽅法关闭线程池时,一定要对任务里进行异常捕获;
  • 当使用shuwdown方法关闭线程池时,一定要确保任务里不会有永久阻塞等待的逻辑,否则线程池就关闭不了;
  • 最后,一定要记得shutdownNow和shuwdown调用完,线程池并不是立刻就关闭了,要想等待线程池关闭,还需调用awaitTermination方法来阻塞等待。

线程池提交的两种方式

线程池有两种提交任务方式,分别是executesubmit

// 方式一:execute 方法
public void execute(Runnable command)

// 方式二: submit 的三个方法
public Future<?> submit(Runnable task)
public <T> Future<T> submit(Runnable task, T result)
public <T> Future<T> submit(Callable<T> task) 

从源码中可以看出两者的一个区别:execute无返回值,submit有返回值,返回Future对象。

线程池在submit的时候会将task包装成一个FutureTask,然后再调用execute方法,如下所示:

public Future<?> submit(Runnable task) {
	// 1、task包装成一个FutureTask
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    // 调用execute方法
    execute(ftask);
    return ftask;
}
//2、 new 一个FutureTask
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

// 3、FutureTask实现RunnableFuture接口
public class FutureTask<V> implements RunnableFuture<V> {}

// 4、RunnableFuture继承Future,所以最后返回Future对象
public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}
    

下面简单了解一下Future

Future表示一个可能还没有完成的异步任务的结果,针对这个结果可以添加Callback以便在任务执行成功或失败后作出相应的操作。

public interface Future<V> {
	// 用来停止一个任务,如果任务可以停止(通过mayInterruptIfRunning来进行判断),则可以返回true,如果任务已经完成或者已经停止,或者这个任务无法停止,则会返回false.
    boolean cancel(boolean mayInterruptIfRunning);
    // 判断当前方法是否取消
    boolean isCancelled();
    // 判断当前方法是否完成
    boolean isDone();
    // get方法可以当任务结束后返回一个结果,如果调用时,工作还没有结束,则会阻塞线程,直到任务执行完毕
    V get() throws InterruptedException, ExecutionException;
    // 多等待timeout的时间就会返回结果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

两者的第二个区别是,execute返回异常无法捕获,默认会执行 ThreadGroupuncaughtException 方法,或者通过继承afterExecute钩子函数,也可以获取运行异常。

ThreadGroup源码继承了Thread.UncaughtExceptionHandler接口,实现uncaughtException方法

public class ThreadGroup implements Thread.UncaughtExceptionHandler {
    
    public void uncaughtException(Thread t, Throwable e) {
        if (parent != null) {
            parent.uncaughtException(t, e);
        } else {
            Thread.UncaughtExceptionHandler ueh =
                Thread.getDefaultUncaughtExceptionHandler();
            if (ueh != null) {
                ueh.uncaughtException(t, e);
            } else if (!(e instanceof ThreadDeath)) {
                System.err.print("Exception in thread ""
                                 + t.getName() + "" ");
                e.printStackTrace(System.err);
            }
        }
    }
}

除了默认的异常处理,可以通过自定义线程工厂来记录异常信息,通过设置setUncaughtExceptionHandler方法捕获异常,还可以修改自己的业务线程名前缀,如下所示:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
                0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1), 
                new MyThreadFactory(), new ThreadPoolPolicy());

// 自定义线程工厂
 private static class MyThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
        private final String myPrefix = "业务标识"; // 这里是打印的线程标识

        MyThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                    Thread.currentThread().getThreadGroup();
            namePrefix = myPrefix + "pool-" +
                    poolNumber.getAndIncrement() +
                    "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
                    0);
            t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
                @Override
                public void uncaughtException(@NonNull Thread t, @NonNull Throwable e) {
                    // 统计异常信息
                    Log.e(TAG, "uncaughtException: " + t.getId());
                    Log.e(TAG, "uncaughtException: " + t.getName());
                    Log.e(TAG, "uncaughtException: " + e.getMessage());
                }
            });
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

而通过继承ThreadPoolExecutor自定义的线程池,也可以重写afterExecute钩子函数来捕获异常:

public class MyThreadPool extends ThreadPoolExecutor {

    public MyThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        Log.e(TAG, "uncaughtException: " + t.getMessage());
    }
}

submit异常可以在get方法中捕获,源码如下:

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    // 在get方法中将异常保存,然后抛出异常
    throw new ExecutionException((Throwable)x);
}

捕获异常:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
        0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1), new ThreadPoolPolicy());

Runnable task = new Runnable() {
    @Override
    public void run() {
        Date endTime = new Date();
        //Log.e(TAG, "任务执行时间: " + endTime);
        Log.e(TAG, "任务线程名: " + Thread.currentThread().getName());
    }
};
// 捕获异常
Future<?> future = threadPoolExecutor.submit(task);
try {
    future.get();
} catch (ExecutionException e) {
    e.printStackTrace();
} catch (InterruptedException e) {
    e.printStackTrace();
}

最后总结一下execute和submit的区别

  • execute无返回值,submit有返回值,返回Future对象;
  • execute返回异常无法捕获,默认会执行 ThreadGroup 的 uncaughtException 方法,submit异常需要在get方法中捕获;

这篇文章主在学习线程池原理和源码中的技能点,学习资源来自以下文献,强烈推荐阅读以下优质文章:

硬核干货:4W字从源码上分析JUC线程池ThreadPoolExecutor的实现原理

Java线程池实现原理及其在美团业务中的实践

线程池,看完这篇真的够了

有的线程它死了,于是它变成一道面试题

线程池之ThreadPoolExecutor线程池源码分析笔记

线程池的基本原理,看完就懂了

面试刷题36:线程池的原理和使用方法?

线程池原理(面试)

Java并发编程:线程池的使用

最后

以上就是精明热狗为你收集整理的Java线程池原理解析的全部内容,希望文章能够帮你解决Java线程池原理解析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(60)

评论列表共有 0 条评论

立即
投稿
返回
顶部