我是靠谱客的博主 谦让香氛,最近开发中收集的这篇文章主要介绍【java_基础深入】ThreadPoolExecutor.execute() 源码分析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

第一层:初见阻塞队列BlockingQueue:workQueue、任务载入方法Worker.addWorker()

第一层的思考:

  1. corePoolSize BlockingQueue 如何直接参与调度
  2. 出现了的Worker是什么
  3. 为什么没有出现maximumPoolSize
  4. workerCountOf 获取的活跃线程数的定义是什么
	public void execute(Runnable command) {
		//线程池指标: 获取线程池状态 + 活跃线程数 (使用二进制位标识)
        int c = ctl.get();  
        
        // 活跃线程数 < corePoolSize 调用任务载入方法 Worker.addWorker(Runnable r, boolean core)
        if (workerCountOf(c) < corePoolSize) { 
            if (addWorker(command, true))  // 【注1】参数为true,标识添加的任务为核心任务
            	// 载入成功就【直接返回】
                return; 
           // 载入失败就刷新线程池指标     
            c = ctl.get();
        }
        
        // 活跃线程数 > corePoolSize, 将任务添加至阻塞队列 BlockingQueue 成功
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 添加任务成功后再次监测线程池状态
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
		// 活跃线程数 > corePoolSize 并且添加阻塞队列 失败, 原因有可能是阻塞队列已满
        else if (!addWorker(command, false)) // 添加非核心任务
            reject(command);
    }

第二层:Worker.addWorker()真正创建了线程

【解答】第一层的思考:

  1. corePoolSize BlockingQueue 如何直接参与调度
    当 活跃线程数 < corePoolSize, 新增一个【core = true】的任务会被直接addWorker(任务),不参与BlockingQueue的逻辑
    当 活跃线程数 > corePoolSize
    —> 添加至 BlockingQueue 成功:不特殊处理
    —> 添加至 BlockingQueue 失败:新增一个【core = false】的任务会被直接addWorker(任务)
  2. 出现了的Worker是什么
    ThreadPoolExecutor.Worker ThreadPoolExecutor 的内部类
    private final class Worker extends AbstractQueuedSynchronizer implements Runnable
    Worker ,是封装了 Runnerable 的一个线程安全类
  3. 为什么没有出现maximumPoolSize
    maximumPoolSize 参与的逻辑隐含在addWorker(Runnable r, boolean core) 第二个参数 core中,见下文分析
    第二层代码中有语句:wc >= (core ? corePoolSize : maximumPoolSize))
  4. workerCountOf 获取的活跃线程数的定义是什么 : 正在执行任务的线程
    核心线程数 不等于 活跃线程数,因为核心线程也有可能阻塞等待任务。
    被阻塞的线程不属于活跃线程数
    关于阻塞的原理会出现在第四层。

第二层的目标:

  1. addWorker(Runnerable r, boolean core)具体做了什么事,与线程有什么关联
  2. workes 容器是什么
	// 将源码精简成以下逻辑
	private boolean addWorker(Runnable firstTask, boolean core) {
        retry: // 死循环直至符合添加条件才往下走逻辑
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            for (;;) {
                int wc = workerCountOf(c);
                // maximumPoolSize出现了, 外层传入core == true, 说明活跃线程数的上限由 corePoolSize决定
                // 这个布尔值维护了两种异常情况:添加核心线程是否异常,添加至最大线程数是否异常
                // 传入 core == false ,说明活跃线程的是上限由 maximumPoolSize 决定
                /* 【重要】当外层 活跃线程数 > corePoolSize 
                 	添加至 BlockingQueue  失败:新增的【core = false】任务会被直接addWorker(任务)
                 	也就是以上情况会新建一个线程“插队”执行
                 */
                if (wc >= CAPACITY ||  wc >= (core ? corePoolSize : maximumPoolSize))  
                	return false;
                if (compareAndIncrementWorkerCount(c)) 
                	break retry;
                c = ctl.get(); 
                if (runStateOf(c) != rs)
                	 continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null; 
        // 新建线程的逻辑: 
        try {
       		 w = new Worker(firstTask); // 加入的Runnable 被封装成了 Worker
       		 // 【重点】见下一个代码区 构造方法Worker(Runnable firstTask)
             final Thread t = w.thread;
             workers.add(w);  // 要区分workers 和 workerQueue 的区别, worker是个HashSet 暂时只看到是用来统计指标的
             int s = workers.size();
             if (s > largestPoolSize) {
             	largestPoolSize = s; // 刷新workers 数量,实时同步监控指标
                workerAdded = true;
             }
         } ...
 
        if (workerAdded) {
            t.start(); // 启动封装而成的Runnable, 所以说每次addWorker(Runnable)就会开启一个线程
            workerStarted = true;
        }
        return workerStarted;
    }
Worker(Runnable firstTask) {
     setState(-1); // inhibit interrupts until runWorker
     this.firstTask = firstTask;
     this.thread = getThreadFactory().newThread(this); // Worker给构建出来就会新建一个线程
}

.


第三层:t.start() 的具体执行逻辑

【解答】第二层的思考:

  1. addWorker(Runnerable r, boolean core)具体做了什么事,与线程有什么关联
    addWorker 无论core的取值为如何,都是为新增的一个Runnable开启一个线程,
    core的取值只是用来维护两种异常情况
    wc >= (core ? corePoolSize : maximumPoolSize)
    上层添加的是核心线程,则判断活跃线程数是否大于核心线程
    上层添加的是非核心线程,则判断活跃线程数是否大于最大线程数,任意异常则return false
  2. workes 容器是什么
    private final HashSet<Worker> workers = new HashSet<Worker>();
    与线程调度没什么关系,暂时看到出现在线程池指标统计里
    在这里插入图片描述

第三层的目标:

  1. t.start()的内部实现是什么
  2. getTask是否联系了 BlockingQueue : workeQueue
	// Worker.run()
    public void run() {
        runWorker(this);
    }
    // ThreadPoolExecutor.runnWorker(Worker w),留下核心逻辑:
    final void runWorker(Worker w) {
        Runnable task = w.firstTask;
        w.firstTask = null;
        boolean completedAbruptly = true;
        try {
       		// 【重点】不断尝试获取任务,获取到了就执行
            while (task != null || (task = getTask()) != null) { 
                try {
                    task.run(); // 先优先处理传入的Runnable, 如果getTask()获取到任务也执行
                } finally {
                    w.completedTasks++;
                }
            }
            completedAbruptly = false;
        } finally {
        	// 出现异常或者无任务时会销毁传入的Worker -> workers.remove(w)
            processWorkerExit(w, completedAbruptly); 
        }
    } 

.


第四层:getTask()的具体实现

【解答】第三层的思考:

  1. t.start()的具体实现, 由于final Thread t = w.thread;
    t.start() -> Worker.run() -> ThreadPoolExecutor.runWorker(this)
    runWorker的具体逻辑是:
    1. 添加任务 -> 触发新线程 -> 该线程优先处理添加的任务firstTask
    2. 处理完firstTask,开始用getTask()获取其他任务 (不是处理完自己的任务就销毁了
    3. 每次任务都会使用上述的逻辑,所以新增的线程越多,能并发处理的任务数越高
  2. getTask() 是否联系了 BlockingQueue : workeQueue
    是,通过BlockingQueue 让任务和执行任务这个动作解耦, 让所有线程能够去抢占 workeQueue 里的任务

第四层的目标:

  1. getTask如何调度阻塞队列 workeQueue 里的任务
	// 只保留核心逻辑
	private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

重点要研究BlockingQueue类中的.poll(long timeout, TimeUnit unit)take() 方法

	/**
     * Retrieves and removes the head of this queue, waiting if necessary
     * until an element becomes available.
     *
     */
    E take() throws InterruptedException;

    /**
     * Retrieves and removes the head of this queue, waiting up to the
     * specified wait time if necessary for an element to become available.
     *
     */
    E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

结论:
take()是获取BlockingQueue中的元素,如果队列为空,就一直等待
所以,take 是实现阻塞的核心
poll(long timeout, TimeUnit unit) 是在单位时间内获取元素,如果获取不到就返回NULL

如何调度:

 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
 Runnable r = timed ?
                   workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                   workQueue.take();
  • 如果 【活跃线程数 < corePoolSize】, 所有核心线程阻塞得获取任务, 回到第三层看代码

    核心线程能保活的关键:getTask() -> workQueue.take() 阻塞在此
    while (task != null || (task = getTask()) != null) { ... } 
    

    线程池创建的核心线程完成了自身任务后,底层通过workQueue.take()实现不销毁线程

  • 如果 【活跃线程数 > corePoolSize】, 开始不阻塞得获取任务,获取不到就让线程执行完毕即销毁
    线程池会最多只保留 corePoolSize 数量的核心线程不主动销毁
    也就解释了当 【maximumPoolSize > 活跃线程数 > corePoolSize】,要直接创建一个线程去执行任务不用担心这个线程不销毁
    既能保证及时完成任务,又不占用阻塞队列,属于并发编程的一种艺术。
    如果 【maximumPoolSize = 活跃线程数 , 阻塞队列未满】添加进阻塞队列,
    maximumPoolSize = 活跃线程数 , 阻塞队列已满】第一层的reject(command);会起作用,这个可以自定义,默认是抛异常

  • 值得一提的timed
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    executor.allowsCoreThreadTimeOut(); 线程池提供了方法配置,可以让核心线程执行完任务后销毁


总结

  • 职责划分:

    Worker(Runnable r,boolean core) 任务,将任务封装为线程安全的类

    Worker.addWorker() 创建线程

    runWorker() 任务执行 ,核心:while (task != null || (task = getTask()) != null){}
    –》 runWorker() 用可以阻塞的循环保证核心线程不被销毁,并可以让一个被创造出来的线程执行多个任务
    –》task.run() 每取到一个任务,单独执行run()方法,省去了另外开辟线程的开销

    BlockingQueue任务任务执行 解耦,使创建出来的线程不再单一执行一次任务
    –》poll() 超过核心线程数 取不到任务就不等了,返回给runWorker()结束循环,让线程消亡
    –》take() 阻塞获取任务,阻塞队列为空也会等待,让核心线程保持存活。

  • 逻辑扭转:
    1. 【活跃线程 <= corePoolSize】 : 一个任务创建一个线程Worker.addWorker(r, true),执行完成任务后不销毁,用于竞争其他任务的执行权
    2. 【maximumPoolSize > 活跃线程 > corePoolSize】 :
      2.1 workQueue队列未满,加入阻塞队列workQueue,让核心线程去竞争执行权
      2.2 workQueue队列已满,Worker.addWorker(r, false) 创建线程并执行r,并淘汰一个核心线程(保持核心线程数不变)
    3. 【maximumPoolSize = 活跃线程 】 :
      3.1 workQueue队列未满,加入阻塞队列workQueue,让核心线程去竞争执行权
      3.2 workQueue队列已满,reject()使用拒绝策略,默认是抛异常
  • 可能存在的思考误区 :
    1. 线程池能如何区分核心线程和非核心线程
      答案是:不区分
      代码从始至终只有 addWorker(r, true / false) 与核心 / 非核心 关联,但是提供的是两种异常处理的监控
      传入true,当前线程池的数量 > corePoolSize就抛异常,因为上层代码逻辑是要增加线程数至 <= corePoolSize
      传入false,当前线程池的数量 > maximumPoolSize就抛异常, 因为上层代码逻辑是增加线程数至 <= maximumPoolSize
      真正维护核心线程数的逻辑是

      boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
      Runnable r = timed ?
                 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                 workQueue.take();
      

      一旦活跃线程数 > corePoolSize。就会随机得选线程执行poll(keepAliveTime, TimeUnit.NANOSECONDS)
      无论是否取到任务,都会让被选中的多余线程自然消亡,达到维持核心线程数的作用。
      所以,核心线程并不是被线程池标记,线程池只维护corePoolSize数量的线程去竞争任务的执行权

最后

以上就是谦让香氛为你收集整理的【java_基础深入】ThreadPoolExecutor.execute() 源码分析的全部内容,希望文章能够帮你解决【java_基础深入】ThreadPoolExecutor.execute() 源码分析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(46)

评论列表共有 0 条评论

立即
投稿
返回
顶部