我们介绍线程池以最关键的类ThreadPoolExecutor讲起。
一、继承关系图
二、ThreadPoolExecutor类介绍
1、相关属性
AtomicInteger ctl:原子整数,记录工人数量workerCount和线程池状态runState
Boolean terminate:表示线程池关闭瞬间的状态。当线程池执行stop或stopNow方法时,此时程序还没有完全终止,isTerminating()为true,否则为false;当线程关闭结束isTerminated()为true。
int corePoolSize:核心线程池数量
int maximumPoolSize:最大线程池数量
long keepAliveTime:线程空闲时间
TimeUnit unit:时间单位
BlockingQueue<Runnable> workQueue:任务队列
ThreadFactory threadFactory:创建线程的线程工厂
RejectedExecutionHandler handler:拒绝策略处理器
int poolSize:当前线程池中线程数量
int activeCount:当前活跃线程数量
int largestPoolSize:线程池中出现过的最大线程数量
long taskCount:已分配处理的任务数量,是个约数
long completedTaskCount:已经处理完毕的任务数量,是个约数
2、线程池状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
1.当创建线程池后,初始时,线程池处于RUNNING状态;
2.如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
3.如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
4.当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,转为TIDYING状态,此时所有任务都已终止,workerCount 为零,转换到状态 TIDYING 的线程将运行 terminate() 钩子方法。terminate()在源码中默认什么也不处理,留给子类去自定义处理。
5.线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING状态转换为TERMINATED状态,此时线程池完全关闭。
6.这里地设计很巧妙,使用原子整数类,ctlOf()方法时将两个数使用或逻辑来处理,两个参数分别是状态和当前工人数,用高位表示状态,低位表示工人数量。
3、构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
这里前三个构造器都显示地调用了第四个构造器,并对没有传入的参数设置了默认值。
4、相关方法
方法介绍:
execute:实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
shutdown:使线程池处于SHUTDOWN状态
shutdownNow:使线程池处于STOP状态
awaitTermination:在调用shutdown后,阻塞任务直到完成,除非超时或出现异常
prestartCoreThread:初始化线程池创建一个线程
prestartAllCoreThreads:初始化线程池创建核心线程数个线程
allowCoreThreadTimeOut:设置核心线程池也有空闲时间,默认没有
remove:从等待队列中删除一个任务
二、AbstractExecutorService抽象类
这个抽象类定义了一些任务的执行
三、ExecutorService接口
定义了执行任务应用的一些方法,上面AbstractExecutorService抽象类给出了该接口的默认实现
四、Executor接口
五、execute方法介绍
铺垫了那么多,现在我们开始进入正题,在进入正题之前一定要保证上面的内容弄清楚,不然可能看不下去了。
1、方法代码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
为方便理解我们一段一段地来解释代码
if (command == null)
throw new NullPointerException();
- 首先判断传过来的要执行的command是否为空,若为空,直接报空指针异常
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
- 之后获取原子类的值,高位状态,低位工人数,用workerCountOf( c )方法获取工人数,若工人数小于核心线程池数,则调用addWorker(command, true)方法执行任务,若执行成功,运行结束。下面我们来看看addWorker方法到底干了什么。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
retry表示break跳出的循环。
同样地,我们将它拆开来看。
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
- addWorker方法首先进入一个无条件循环,获得线程池运行状态,判断,如果状态码大于SHUTDOWN的状态码直接返回false,阻止执行任务,注意正常情况下(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())一定为false,因为我们传过来的firstTask参数在execute方法中判断过,不为空,那么为什么还加一个判断呢,这个是一个铺垫,我提前说了,下面我也会介绍,这是因为有时可能会出现我们关闭所有线程了,线程池状态处于shutdown状态,但是还有关闭一瞬间添加进来的任务,我们需要处理它,所以这里会有第二个条件。之后进入下一个无条件循环获取工人数量,如果工人数量大于工人容量,或者大于核心线程池数量(core参数传进来就是true),返回false拒绝执行。之后使用CAS自旋添加一个工人,并退出本次循环,若自旋失败,判断状态是否发生改变,若改变重新执行外循环,否则一直内循环直到自旋成功。
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
- 首先定义了两个变量,代表工作是否启动和工作是否添加,首先创建一个工人对象,那么工人对象到底是什么呢?我们接下来看一看
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker类组装了待执行的任务和执行任务的线程,他们俩构成了Worker。注意这里构造函数中将自己传给thread,方便日后调用start方法,会执行Worker自己的run方法。
Worker结构如下
这里包括一些继承过来的简单的加锁和解锁方法,最重要的时run方法,下面来介绍,interruptIfStarted方法是打断线程执行的方法。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
孩子太累了,我是写到后面才发现run方法没写/(ㄒoㄒ)/~~,又给补上的,今天一天啥也没干,光看线程池源码了,累死孩子了,大家走过路过留个赞吧(っ´Ι`)っ。写不动了,改天再写吧,记住还差一个run方法,里面有两点比较重要,一个是getTask()方法,一个是processWorkerExit(w, completedAbruptly)方法。
我来啦,方法最开始做了一些初始化方法包括解锁,允许线程被interrupt,之后就循环从队列中获得任务,这里有一个超长的判断,那么它是什么意思呢
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
wt.interrupt();
首先如果线程池调用ShutdownNow方法,线程池状态处于STOP,这时候我们就要使线程状态为interrupt,!wt.isInterrupted()就是判断wt线程状态不为打断状态,我们就设置它为打断状态。但是如果线程池状态不为STOP,我们就要保证线程状态就不能是打断状态,那么我们如何保证呢,我们调用了Thread.interrupted方法,这个方法返回线程是否打断状态并将打断状态重置,也就是说如果线程状态为打断状态,调用了这个方法之后,线程状态就会转换为非打断状态。
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
执行前置方法,运行run方法,执行后置方法(在JDK源码中这两个方法并没有提供具体的实现,是留给子类去根据自己情况来完善的),完成任务数加一,这里还解释了问什么官方文档中说completedTasks是一个约数,因为在执行过程中可能出错,如果任务出错,我们不应该将任务数加一。如果在循环中没有拿到任务,就会退出循环并将completedAbruptly设置为false,代表线程由于没有拿到任务而退出,而不是因为线程池关闭而退出,最后执行退出方法,下面我们来介绍getTask方法,它是怎么从队列中获取任务的。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
// 检索并删除队列头,如果等待时间没有拿到,返回null
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 检索并删除队列头不等待
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
首先设置超时时间为不超时,之后进入一个无条件循环,之后获取线程池状态当状态为STOP,直接返回null,若状态为SHOTDOWN且队列是null,将工人数CAS自旋减一,返回null。
之后获取工人数,若设置核心线程拥有超时时间或者线程数大于核心线程数,timed为true用来判断工人是否被淘汰。
如果工人可以被淘汰且超过超时时间,并且要保证在队列中的任务有工人可以处理,我们就将工人数自旋减一,返回null。
若工人不被淘汰,根据timed条件调用带时间和不带时间的等待方法。
如果拿到了,就返回,没拿到就当作超时(因为还可能是调用take方法返回null,这时就不会被淘汰,因为核心线程不超时不被淘汰),设置timedOut为true,进入下一次循环。这个方法介绍完毕,这里挖个坑吧,有时间把几种队列介绍以下一共有以下几种
下面来介绍processWorkerExit方法。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
首先判断线程是否由于没有拿到任务而退出,如果没有拿到任务而退出我们之前介绍过,它已经把工人数量减一了,如果因为错误而退出,这里就需要将工人数量减一。之后加锁,将当前工人所完成的任务数加入到总任务数中并将当前任务从工作集中删除(这是个无底洞啊),tryTerminate方法判断是否有可能将线程池状态转换为TERMINATED状态,如果有资格终止但 workerCount 非零,则中断空闲的工作程序以确保关闭信号传播。
首先计算最小工人数量,若当前工人数量大于最小工人数量则不添加工人(因为我们刚刚删除了一个),否则添加一个工人。
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
- 上述条件满足说明可以创建一个新的线程来执行任务,所以它首先创建一个Worker工作,判断工作的属性thread是否为空,若为空或者在执行过程中报错进入到finally语句,workerStarted为false,addWorkerFailed将当前工作从工作集workers中移除。若thread不为空则加锁,若线程池状态为RUNNING则判断线程是否开启,我们刚刚创建线程,但是还没有让它start,如果它正在运行说明出错了,之后添加工作集并将工作添加状态设置为true。之后启动线程,并将工作是否开启设置为true,最后返回工作状态workerStarted。
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
- 我们在回过头来看execute方法,当任务正常通过创建一个新的线程来执行没有问题就结束方法,若创建失败则有可能是线程池状态发生改变,我们要重新获取线程池状态并进入到下一条语句。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
- 如果线程池状态是正在运行且将任务加入到工作队列成功,由于没有加锁,很可能在添加队列的过程中线程池的状态被改变,或者所有线程同时死亡,所以要重新获取并判断一遍线程状态,若状态不对将command从队列中移除并调用一个拒绝方法,这里有两个重要注意点,一个是队列,一个是拒绝策略,我们之后介绍。如果不拒绝command,但是工人数量为0,说明此时线程池正没有空闲的工人来处理任务,这时候就需要创建一个工人来处理刚刚进入队列的command。
else if (!addWorker(command, false))
reject(command);
- 如果队列添加失败,就尝试再次开启一个工人去执行,如果达到了最大工人数或者线程池已经关闭就会被拒绝。
哇偶,完结撒花★,°:.☆( ̄▽ ̄)/$:.°★ 。如果有机会我会接着介绍shutdown方法和拒绝策略,工作队列等内容。
最后
以上就是香蕉战斗机最近收集整理的关于线程池源码详解的全部内容,更多相关线程池源码详解内容请搜索靠谱客的其他文章。
发表评论 取消回复