我是靠谱客的博主 魁梧毛衣,最近开发中收集的这篇文章主要介绍ThreadPoolExecutor - 线程池ThreadPoolExecutor1 Java构建线程的方式2 线程池的7个参数3 线程池的执行流程4 线程池属性标识5 execute方法执行6 Worker方法7. 线程池的创建方式,觉得挺不错的,现在分享给大家,希望可以做个参考。
概述
ThreadPoolExecutor
1 Java构建线程的方式
- 继承 Thread
- 实现 Runnable
- 实现 Callable
- 线程池方式
2 线程池的7个参数
int corePoolSize, //核心线程数
int maximumPoolSize, //最大线程数
long keepAliveTime, //最大空闲时间
TimeUnit unit, //时间单位
BlockingQueue<Runnable> workQueue, //阻塞队列
ThreadFactory threadFactory, //线程工厂
RejectedExecutionHandler handler //拒绝策略
3 线程池的执行流程
4 线程池属性标识
4.1 线程池属性
// 是一个int类型的数值,表示:
//1.声明当前线程池的状态(5种)
// 高3位:线程池状态 低29位:线程池线程个数
//2.声明线程池中的线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//29,方便后面做位运算
private static final int COUNT_BITS = Integer.SIZE - 3;
//通过运算得出
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 111 代表正常接收处理任务
private static final int RUNNING = -1 << COUNT_BITS;
// 000 代表不接受新任务,但内部还处理已有任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// executor.shutdownNow 001 不接受新任务,内部任务不再处理,同时中断当前任务
private static final int STOP = 1 << COUNT_BITS;
// 010 过渡状态,线程池即将over
private static final int TIDYING = 2 << COUNT_BITS;
// 011 代表线程池已经结束
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 得到线程池的状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//得到线程池线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
4.2 线程池状态转换
5 execute方法执行
execute方法
public void execute(Runnable command){
//健壮性
if (command == null)
throw new NullPointerException();
// 拿到32位的int
int c = ctl.get();
//工作线程数 < 核心线程数,则创建核心线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 创建核心线程失败,重新获取ctl,32位的int
c = ctl.get();
}
// 如果处于RUNNING状态,则添加任务到阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
// 再次获取 ctl
int recheck = ctl.get();
// 再次判断是否是RUNNING状态,不是则移除任务
if (! isRunning(recheck) && remove(command))
reject(command); //拒绝策略
//如果处于RUNNING状态,并且工作线程数为0
else if (workerCountOf(recheck) == 0)
//阻塞队列有任务但没有工作线程,则添加一个任务为空的工作线程处理阻塞队列中的任务
addWorker(null, false);
}
// 不处于RUNNING状态则创建非核心线程处理任务
else if (!addWorker(command, false))
//创建失败则拒绝任务
reject(command);
}
addWorker方法
/**
* 创建工作线程
* @param firstTask 传入任务
* @param core 需要创建的是否是核心线程
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 此for循环的目的是经过一些判断,给工作线程标识数+1
for (;;) {
// 获取32位int
int c = ctl.get();
// 获取线程池状态
int rs = runStateOf(c);
/* 可能创建成功的条件:
1. RUNNING下可能创建成功
2. SHUTDOWN状态下可以为阻塞队列任务创建工作线程
当传入firstTask为null时,会去处理阻塞队列的任务,
所以只有当 firstTask == null && !workQueue.isEmpty()时才会创建工作线程
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
// 无需创建工作线程
return false;
for (;;) {
// 获取工作线程数
int wc = workerCountOf(c);
/* 如果工作线程数大于线程池容量(固定容量)
或者需要创建核心线程,但核心线程数已满
或者工作线程数大于最大线程数(自定义,不超过容量)
则退出创建工作线程
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 工作线程数+1,CAS方式
if (compareAndIncrementWorkerCount(c))
break retry; //如果失败则结束外侧循环
c = ctl.get(); // 重新获取ctl,32位int
//重新获取的当前状态是否与之前获取的状态一致
if (runStateOf(c) != rs)
//不一致则结束内侧循环,进行下一次外侧循环重新获取状态
continue retry;
}
}
boolean workerStarted = false; // 表示工作线程还未启动
boolean workerAdded = false; // 表示工作线程还未添加成功
Worker w = null; // Worker是工作线程
try {
w = new Worker(firstTask); // 创建工作线程
final Thread t = w.thread; // 获取到线程对象
if (t != null) { // 一般不会为null
// 加锁是为了避免在创建过程中其它线程把线程池停掉
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get()); // 重新获取线程池状态
// 如果线程池是RUNNING状态或(SHUTDOWN状态且firstTask为空)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 检查线程t是否已经启动,应该还没有启动
throw new IllegalThreadStateException();
// 将工作线程添加到工作线程集合(HashSet集合)中
workers.add(w);
int s = workers.size();
// 记录工作线程数量达到过的最大值
if (s > largestPoolSize)
largestPoolSize = s;
// 工作线程添加成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 启动线程
workerStarted = true; // 线程启动成功
}
}
} finally {
if (! workerStarted) // 启动失败则执行此方法
addWorkerFailed(w);
}
return workerStarted;
}
6 Worker方法
worker类实现了Runnable接口,实现了run方法
Worker()
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
runWork()
final void runWorker(ThreadPoolExecutor.Worker w) {
// 获取当前线程
Thread wt = Thread.currentThread();
// 拿到任务
Runnable task = w.firstTask;
// AQS先不关注
w.firstTask = null;
w.unlock(); // allow interrupts
// 标识为true
boolean completedAbruptly = true;
try {
// 有任务则执行
// 没任务则getTask()从阻塞队列获取任务,拿不到则阻塞在此,直到超时或核心线程一直阻塞
while (task != null || (task = getTask()) != null) {
w.lock(); // 加锁,避免shutdown任务中断,shutdown需要拿到全局锁
if ((runStateAtLeast(ctl.get(), STOP) || // 当前状态大于等于STOP
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 自定义前值增强
Throwable thrown = null;
try {
task.run(); // 执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 自定义后置增强
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 线程执行之后的后续处理
processWorkerExit(w, completedAbruptly);
}
7. 线程池的创建方式
// 7 种创建发方式
//最基础的创建方式
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
1, //corePoolSize 核心线程数
10, //maximumPoolSize 最大线程数
10L, //keepAliveTime 空闲线程最大存活时间
TimeUnit.MINUTES, //timeUtil 存活时间的时间单位
new ArrayBlockingQueue<Runnable>(50), //workQueue 阻塞队列
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return null;
}
},
new ThreadPoolExecutor.DiscardOldestPolicy() //handler 拒绝策略
);
//其余6种方式
//一个活动线程,无边界队列,任务一个一个排序执行
ExecutorService singlePool = Executors.newSingleThreadExecutor();
//缓存线程以便于重用,缓存空了则新建线程,线程空闲60s则踢出缓存,处理大量短时任务,消耗资源少
ExecutorService cachePool = Executors.newCachedThreadPool();
//创建指定线程数的线程池,定时执行任务
Executors.newScheduledThreadPool(4);
//保持nThread个活动线程,有线程退出则新建补齐,任务过多时多余的任务存阻塞队列
Executors.newFixedThreadPool(4);
//并行处理任务,不保证顺序,java8才加入此创建方法
Executors.newWorkStealingPool();
//创建单线程池,定时执行任务
Executors.newSingleThreadScheduledExecutor();
最后
以上就是魁梧毛衣为你收集整理的ThreadPoolExecutor - 线程池ThreadPoolExecutor1 Java构建线程的方式2 线程池的7个参数3 线程池的执行流程4 线程池属性标识5 execute方法执行6 Worker方法7. 线程池的创建方式的全部内容,希望文章能够帮你解决ThreadPoolExecutor - 线程池ThreadPoolExecutor1 Java构建线程的方式2 线程池的7个参数3 线程池的执行流程4 线程池属性标识5 execute方法执行6 Worker方法7. 线程池的创建方式所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复