概述
第一层:初见阻塞队列BlockingQueue:workQueue、任务载入方法Worker.addWorker()
第一层的思考:
- corePoolSize BlockingQueue 如何直接参与调度
- 出现了的Worker是什么
- 为什么没有出现maximumPoolSize
- workerCountOf 获取的活跃线程数的定义是什么
public void execute(Runnable command) {
//线程池指标: 获取线程池状态 + 活跃线程数 (使用二进制位标识)
int c = ctl.get();
// 活跃线程数 < corePoolSize 调用任务载入方法 Worker.addWorker(Runnable r, boolean core)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // 【注1】参数为true,标识添加的任务为核心任务
// 载入成功就【直接返回】
return;
// 载入失败就刷新线程池指标
c = ctl.get();
}
// 活跃线程数 > corePoolSize, 将任务添加至阻塞队列 BlockingQueue 成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 添加任务成功后再次监测线程池状态
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 活跃线程数 > corePoolSize 并且添加阻塞队列 失败, 原因有可能是阻塞队列已满
else if (!addWorker(command, false)) // 添加非核心任务
reject(command);
}
第二层:Worker.addWorker()真正创建了线程
【解答】第一层的思考:
- corePoolSize BlockingQueue 如何直接参与调度
当 活跃线程数 < corePoolSize, 新增一个【core = true】的任务会被直接addWorker(任务),不参与BlockingQueue的逻辑
当 活跃线程数 > corePoolSize
—> 添加至 BlockingQueue 成功:不特殊处理
—> 添加至 BlockingQueue 失败:新增一个【core = false】的任务会被直接addWorker(任务) - 出现了的Worker是什么
ThreadPoolExecutor.Worker
ThreadPoolExecutor 的内部类
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
Worker
,是封装了 Runnerable 的一个线程安全类 - 为什么没有出现maximumPoolSize
maximumPoolSize 参与的逻辑隐含在addWorker(Runnable r, boolean core) 第二个参数 core中,见下文分析
第二层代码中有语句:wc >= (core ? corePoolSize : maximumPoolSize))
- workerCountOf 获取的活跃线程数的定义是什么 : 正在执行任务的线程
核心线程数 不等于 活跃线程数,因为核心线程也有可能阻塞等待任务。
被阻塞的线程不属于活跃线程数
关于阻塞的原理会出现在第四层。
第二层的目标:
- addWorker(Runnerable r, boolean core)具体做了什么事,与线程有什么关联
- workes 容器是什么
// 将源码精简成以下逻辑
private boolean addWorker(Runnable firstTask, boolean core) {
retry: // 死循环直至符合添加条件才往下走逻辑
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
for (;;) {
int wc = workerCountOf(c);
// maximumPoolSize出现了, 外层传入core == true, 说明活跃线程数的上限由 corePoolSize决定
// 这个布尔值维护了两种异常情况:添加核心线程是否异常,添加至最大线程数是否异常
// 传入 core == false ,说明活跃线程的是上限由 maximumPoolSize 决定
/* 【重要】当外层 活跃线程数 > corePoolSize
添加至 BlockingQueue 失败:新增的【core = false】任务会被直接addWorker(任务)
也就是以上情况会新建一个线程“插队”执行
*/
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
// 新建线程的逻辑:
try {
w = new Worker(firstTask); // 加入的Runnable 被封装成了 Worker
// 【重点】见下一个代码区 构造方法Worker(Runnable firstTask)
final Thread t = w.thread;
workers.add(w); // 要区分workers 和 workerQueue 的区别, worker是个HashSet 暂时只看到是用来统计指标的
int s = workers.size();
if (s > largestPoolSize) {
largestPoolSize = s; // 刷新workers 数量,实时同步监控指标
workerAdded = true;
}
} ...
if (workerAdded) {
t.start(); // 启动封装而成的Runnable, 所以说每次addWorker(Runnable)就会开启一个线程
workerStarted = true;
}
return workerStarted;
}
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // Worker给构建出来就会新建一个线程
}
.
第三层:t.start() 的具体执行逻辑
【解答】第二层的思考:
- addWorker(Runnerable r, boolean core)具体做了什么事,与线程有什么关联
addWorker 无论core的取值为如何,都是为新增的一个Runnable开启一个线程,
core的取值只是用来维护两种异常情况
wc >= (core ? corePoolSize : maximumPoolSize)
上层添加的是核心线程,则判断活跃线程数是否大于核心线程
上层添加的是非核心线程,则判断活跃线程数是否大于最大线程数,任意异常则return false - workes 容器是什么
private final HashSet<Worker> workers = new HashSet<Worker>();
与线程调度没什么关系,暂时看到出现在线程池指标统计里
第三层的目标:
- t.start()的内部实现是什么
- getTask是否联系了 BlockingQueue : workeQueue
// Worker.run()
public void run() {
runWorker(this);
}
// ThreadPoolExecutor.runnWorker(Worker w),留下核心逻辑:
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
boolean completedAbruptly = true;
try {
// 【重点】不断尝试获取任务,获取到了就执行
while (task != null || (task = getTask()) != null) {
try {
task.run(); // 先优先处理传入的Runnable, 如果getTask()获取到任务也执行
} finally {
w.completedTasks++;
}
}
completedAbruptly = false;
} finally {
// 出现异常或者无任务时会销毁传入的Worker -> workers.remove(w)
processWorkerExit(w, completedAbruptly);
}
}
.
第四层:getTask()的具体实现
【解答】第三层的思考:
- t.start()的具体实现, 由于final Thread t = w.thread;
t.start() -> Worker.run() -> ThreadPoolExecutor.runWorker(this)
runWorker
的具体逻辑是:- 添加任务 -> 触发新线程 -> 该线程优先处理添加的任务firstTask
- 处理完
firstTask
,开始用getTask()
获取其他任务 (不是处理完自己的任务就销毁了) - 每次任务都会使用上述的逻辑,所以新增的线程越多,能并发处理的任务数越高
- getTask() 是否联系了 BlockingQueue : workeQueue
是,通过BlockingQueue 让任务和执行任务这个动作解耦, 让所有线程能够去抢占 workeQueue 里的任务
第四层的目标:
- getTask如何调度阻塞队列 workeQueue 里的任务
// 只保留核心逻辑
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
重点要研究BlockingQueue类
中的.poll(long timeout, TimeUnit unit)
和 take()
方法
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available.
*
*/
E take() throws InterruptedException;
/**
* Retrieves and removes the head of this queue, waiting up to the
* specified wait time if necessary for an element to become available.
*
*/
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
结论:
take()
是获取BlockingQueue中的元素,如果队列为空,就一直等待 所以,take 是实现阻塞的核心
poll(long timeout, TimeUnit unit)
是在单位时间内获取元素,如果获取不到就返回NULL
如何调度:
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
-
如果 【活跃线程数 < corePoolSize】, 所有核心线程阻塞得获取任务, 回到第三层看代码
核心线程能保活的关键:getTask() -> workQueue.take() 阻塞在此 while (task != null || (task = getTask()) != null) { ... }
线程池创建的核心线程完成了自身任务后,底层通过workQueue.take()实现不销毁线程
-
如果 【活跃线程数 > corePoolSize】, 开始不阻塞得获取任务,获取不到就让线程执行完毕即销毁
线程池会最多只保留 corePoolSize 数量的核心线程不主动销毁
也就解释了当 【maximumPoolSize > 活跃线程数 > corePoolSize】,要直接创建一个线程去执行任务不用担心这个线程不销毁
既能保证及时完成任务,又不占用阻塞队列,属于并发编程的一种艺术。
如果 【maximumPoolSize = 活跃线程数 , 阻塞队列未满】添加进阻塞队列,
【maximumPoolSize = 活跃线程数 , 阻塞队列已满】第一层的reject(command)
;会起作用,这个可以自定义,默认是抛异常 -
值得一提的
timed
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
executor.allowsCoreThreadTimeOut(); 线程池提供了方法配置,可以让核心线程执行完任务后销毁
总结
-
职责划分:
Worker(Runnable r,boolean core)
任务,将任务封装为线程安全的类Worker.addWorker()
创建线程runWorker()
任务执行 ,核心:while (task != null || (task =getTask()
) != null){}
–》runWorker()
用可以阻塞的循环保证核心线程不被销毁,并可以让一个被创造出来的线程执行多个任务
–》task.run()
每取到一个任务,单独执行run()方法,省去了另外开辟线程的开销BlockingQueue
把任务 、 任务执行 解耦,使创建出来的线程不再单一执行一次任务。
–》poll()
超过核心线程数 取不到任务就不等了,返回给runWorker()
结束循环,让线程消亡
–》take()
阻塞获取任务,阻塞队列为空也会等待,让核心线程保持存活。 -
逻辑扭转:
- 【活跃线程 <= corePoolSize】 : 一个任务创建一个线程
Worker.addWorker(r, true)
,执行完成任务后不销毁,用于竞争其他任务的执行权 - 【maximumPoolSize > 活跃线程 > corePoolSize】 :
2.1workQueue
队列未满,加入阻塞队列workQueue
,让核心线程去竞争执行权
2.2workQueue
队列已满,Worker.addWorker(r, false)
创建线程并执行r,并淘汰一个核心线程(保持核心线程数不变) - 【maximumPoolSize = 活跃线程 】 :
3.1workQueue
队列未满,加入阻塞队列workQueue
,让核心线程去竞争执行权
3.2workQueue
队列已满,reject()
使用拒绝策略,默认是抛异常
- 【活跃线程 <= corePoolSize】 : 一个任务创建一个线程
-
可能存在的思考误区 :
-
线程池能如何区分核心线程和非核心线程
答案是:不区分
代码从始至终只有addWorker(r, true / false)
与核心 / 非核心 关联,但是提供的是两种异常处理的监控
传入true
,当前线程池的数量 > corePoolSize就抛异常,因为上层代码逻辑是要增加线程数至 <= corePoolSize
传入false
,当前线程池的数量 > maximumPoolSize就抛异常, 因为上层代码逻辑是增加线程数至 <= maximumPoolSize
真正维护核心线程数的逻辑是:boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
一旦活跃线程数 > corePoolSize。就会随机得选线程执行
poll(keepAliveTime, TimeUnit.NANOSECONDS)
无论是否取到任务,都会让被选中的多余线程自然消亡,达到维持核心线程数的作用。
所以,核心线程并不是被线程池标记,线程池只维护corePoolSize数量的线程去竞争任务的执行权
-
最后
以上就是谦让香氛为你收集整理的【java_基础深入】ThreadPoolExecutor.execute() 源码分析的全部内容,希望文章能够帮你解决【java_基础深入】ThreadPoolExecutor.execute() 源码分析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复