概述
一、构造方法
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
二、线程池核心参数
corePoolSize:核心线程池的大小,如果核心线程池有空闲位置,这时新的任务就会被核心线程池新建一个线程执行,执行完毕后不会销毁线程;
maximumPoolSize:线程池能创建最大的线程数量。如果核心线程池和缓存队列都已经满了,新的任务进来就会创建新的线程来执行。但是数量不能超过maximunPoolSize,否侧会采取拒绝接受任务策略,我们下面会具体分析拒绝策略。
keepAliveTime:非核心线程能够空闲的最长时间,超过时间,线程终止。这个参数默认只有在线程数量超过核心线程池大小时才会起作用。只要线程数量不超过核心线程大小,就不会起作用。
unit:时间单位,和keepAliveTime配合使用。
workQueue:缓存队列,用来存放等待被执行的任务。
//阻塞队列
BlockingQueue<Runnable> workQueue = null;
workQueue = new ArrayBlockingQueue<>(5);//基于数组的先进先出队列,有界
workQueue = new LinkedBlockingQueue<>();//基于链表的先进先出队列,无界
workQueue = new SynchronousQueue<>();//无缓冲的等待队列,无界
ArrayBlockingQueue中的锁是没有分离的,即生产和消费用的是同一个锁; 使用一个ReentrantLock来保证线程安全:入列和出列前都需要获取该锁。
LinkedBlockingQueue中的锁是分离的,使用两个ReentrantLock来保证线程安全:入列前需要获取到入列锁(putLock),出列前需要获取到出列锁(takeLock),实现了入列锁和出列锁的分离,并且LinkedBlockingQueue由于无界,所以offer以及poll的吞吐量通常比ArrayBlockingQueue要高
threadFactory:线程工厂,用来创建线程。
handler:拒绝任务策略
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。(存在风险,数据丢失)
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)(存在风险,数据丢失)
ThreadPoolExecutor.CallerRunsPolicy:由提交任务的线程处理该任务,也就是谁提交,谁负责执行,此时主线程将在一段时间内不能提交任何任务(自己执行任务,线程被占用),在此期间,线程池也会利用这段时间消费队列的任务,相当于给线程池一定的缓冲期。
三、线程复用原理
1、首先看下提交task任务方法中 addWorker(command, true)方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
2、addWorker(command, true)方法中调用t.start方法时,会调用runWorker(this);
//线程启动的时候
private boolean addWorker(Runnable firstTask, boolean core) {
Worker w = null;
w = new Worker(firstTask);
final Thread t = w.thread;
...//代码
t.start();
...//代码
}
------------------------------------------------------------------
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
// worker内部类
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// new一个工作线程
this.thread = getThreadFactory().newThread(this);
}
// 重写run方法
@override
public void run() {
runWorker(this);
}
}
3、具体的runWorker方法中while循环 不断从getTask队列中获取task,这块就是线程复用原理
//2,继续看里面的runWorker(this);很显然this是内部类Work对象本身
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
../代码
try {
while (task != null || (task = getTask()) != null) {
try{
../代码
task.run();
}catch(){
../代码
} finally {
../代码
}
}
../代码
} finally {
// 从线程池回收线程
processWorkerExit(w, completedAbruptly);
}
}
4、从getTask队列中获取task
private Runnable getTask() {
...//代码
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 从队列中获取runnable任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
在processWorkerExit方法中有进行worker线程移除 workers.remove(w);
四、关闭线程池
shutdown:当线程池调用该方法时,线程池的状态立刻变为SHUTDOWN状态,已提交的任务会被继续执行,而新提交的任务会像线程池饱和时那样被拒绝掉。
shutdownNow:执行该方法,拒绝接收新提交的任务,
(1)线程池的状态立即变为STOP,
(2)并试图阻止所有正在执行的线程,
(3)不再处理还在线程池队列中等待的任务,当然,它会返回那些已提交但是未被执行的任务列表。
shutdownNow内部是通过调用工作者线程的interrupt方法来停止正在执行的任务的,但是这种方法的作用有限,如果线程中没有sleep、wait、Condition、定时锁等应用,interrupt是无法中断当前线程的。所以,shutdownNow并不代表线程池一定会立刻退出,它可能需要等待所有正在执行的任务都执行完毕才会退出。反过来说,在关闭线程池的时候如果我们能够确保已经提交的任务都已执行完毕并且没有新的任务会被提交,那么调用shutdownNow总是安全可靠的。
只要调用了这两个关闭方法的其中一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。
至于我们应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow。
最大的区别是:shutdown是会执行完所有的任务,shutdownnow是可能会执行完正在执行的任务,但是一定会抛弃等待中的任务.
最后
以上就是唠叨眼神为你收集整理的ThreadPoolExecutor线程池以及线程复用原理的全部内容,希望文章能够帮你解决ThreadPoolExecutor线程池以及线程复用原理所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复