我是靠谱客的博主 魁梧毛衣,这篇文章主要介绍ThreadPoolExecutor - 线程池ThreadPoolExecutor1 Java构建线程的方式2 线程池的7个参数3 线程池的执行流程4 线程池属性标识5 execute方法执行6 Worker方法7. 线程池的创建方式,现在分享给大家,希望可以做个参考。

ThreadPoolExecutor

1 Java构建线程的方式

  1. 继承 Thread
  2. 实现 Runnable
  3. 实现 Callable
  4. 线程池方式

2 线程池的7个参数

复制代码
1
2
3
4
5
6
7
8
int corePoolSize, //核心线程数 int maximumPoolSize, //最大线程数 long keepAliveTime, //最大空闲时间 TimeUnit unit, //时间单位 BlockingQueue<Runnable> workQueue, //阻塞队列 ThreadFactory threadFactory, //线程工厂 RejectedExecutionHandler handler //拒绝策略

3 线程池的执行流程

在这里插入图片描述

4 线程池属性标识

4.1 线程池属性

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 是一个int类型的数值,表示: //1.声明当前线程池的状态(5种) // 高3位:线程池状态 低29位:线程池线程个数 //2.声明线程池中的线程数 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //29,方便后面做位运算 private static final int COUNT_BITS = Integer.SIZE - 3; //通过运算得出 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits // 111 代表正常接收处理任务 private static final int RUNNING = -1 << COUNT_BITS; // 000 代表不接受新任务,但内部还处理已有任务 private static final int SHUTDOWN = 0 << COUNT_BITS; // executor.shutdownNow 001 不接受新任务,内部任务不再处理,同时中断当前任务 private static final int STOP = 1 << COUNT_BITS; // 010 过渡状态,线程池即将over private static final int TIDYING = 2 << COUNT_BITS; // 011 代表线程池已经结束 private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl // 得到线程池的状态 private static int runStateOf(int c) { return c & ~CAPACITY; } //得到线程池线程数量 private static int workerCountOf(int c) { return c & CAPACITY; }

4.2 线程池状态转换

在这里插入图片描述

5 execute方法执行

execute方法

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void execute(Runnable command){ //健壮性 if (command == null) throw new NullPointerException(); // 拿到32位的int int c = ctl.get(); //工作线程数 < 核心线程数,则创建核心线程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; // 创建核心线程失败,重新获取ctl,32位的int c = ctl.get(); } // 如果处于RUNNING状态,则添加任务到阻塞队列中 if (isRunning(c) && workQueue.offer(command)) { // 再次获取 ctl int recheck = ctl.get(); // 再次判断是否是RUNNING状态,不是则移除任务 if (! isRunning(recheck) && remove(command)) reject(command); //拒绝策略 //如果处于RUNNING状态,并且工作线程数为0 else if (workerCountOf(recheck) == 0) //阻塞队列有任务但没有工作线程,则添加一个任务为空的工作线程处理阻塞队列中的任务 addWorker(null, false); } // 不处于RUNNING状态则创建非核心线程处理任务 else if (!addWorker(command, false)) //创建失败则拒绝任务 reject(command); }

addWorker方法

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/** * 创建工作线程 * @param firstTask 传入任务 * @param core 需要创建的是否是核心线程 */ private boolean addWorker(Runnable firstTask, boolean core) { retry: // 此for循环的目的是经过一些判断,给工作线程标识数+1 for (;;) { // 获取32位int int c = ctl.get(); // 获取线程池状态 int rs = runStateOf(c); /* 可能创建成功的条件: 1. RUNNING下可能创建成功 2. SHUTDOWN状态下可以为阻塞队列任务创建工作线程 当传入firstTask为null时,会去处理阻塞队列的任务, 所以只有当 firstTask == null && !workQueue.isEmpty()时才会创建工作线程 */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) // 无需创建工作线程 return false; for (;;) { // 获取工作线程数 int wc = workerCountOf(c); /* 如果工作线程数大于线程池容量(固定容量) 或者需要创建核心线程,但核心线程数已满 或者工作线程数大于最大线程数(自定义,不超过容量) 则退出创建工作线程 */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 工作线程数+1,CAS方式 if (compareAndIncrementWorkerCount(c)) break retry; //如果失败则结束外侧循环 c = ctl.get(); // 重新获取ctl,32位int //重新获取的当前状态是否与之前获取的状态一致 if (runStateOf(c) != rs) //不一致则结束内侧循环,进行下一次外侧循环重新获取状态 continue retry; } } boolean workerStarted = false; // 表示工作线程还未启动 boolean workerAdded = false; // 表示工作线程还未添加成功 Worker w = null; // Worker是工作线程 try { w = new Worker(firstTask); // 创建工作线程 final Thread t = w.thread; // 获取到线程对象 if (t != null) { // 一般不会为null // 加锁是为了避免在创建过程中其它线程把线程池停掉 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // 重新获取线程池状态 // 如果线程池是RUNNING状态或(SHUTDOWN状态且firstTask为空) if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // 检查线程t是否已经启动,应该还没有启动 throw new IllegalThreadStateException(); // 将工作线程添加到工作线程集合(HashSet集合)中 workers.add(w); int s = workers.size(); // 记录工作线程数量达到过的最大值 if (s > largestPoolSize) largestPoolSize = s; // 工作线程添加成功 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); // 启动线程 workerStarted = true; // 线程启动成功 } } } finally { if (! workerStarted) // 启动失败则执行此方法 addWorkerFailed(w); } return workerStarted; }

6 Worker方法

worker类实现了Runnable接口,实现了run方法

Worker()

复制代码
1
2
3
4
5
6
7
8
9
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); }

runWork()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
final void runWorker(ThreadPoolExecutor.Worker w) { // 获取当前线程 Thread wt = Thread.currentThread(); // 拿到任务 Runnable task = w.firstTask; // AQS先不关注 w.firstTask = null; w.unlock(); // allow interrupts // 标识为true boolean completedAbruptly = true; try { // 有任务则执行 // 没任务则getTask()从阻塞队列获取任务,拿不到则阻塞在此,直到超时或核心线程一直阻塞 while (task != null || (task = getTask()) != null) { w.lock(); // 加锁,避免shutdown任务中断,shutdown需要拿到全局锁 if ((runStateAtLeast(ctl.get(), STOP) || // 当前状态大于等于STOP (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); // 自定义前值增强 Throwable thrown = null; try { task.run(); // 执行任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); // 自定义后置增强 } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 线程执行之后的后续处理 processWorkerExit(w, completedAbruptly); }

7. 线程池的创建方式

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 7 种创建发方式 //最基础的创建方式 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 1, //corePoolSize 核心线程数 10, //maximumPoolSize 最大线程数 10L, //keepAliveTime 空闲线程最大存活时间 TimeUnit.MINUTES, //timeUtil 存活时间的时间单位 new ArrayBlockingQueue<Runnable>(50), //workQueue 阻塞队列 new ThreadFactory() { @Override public Thread newThread(Runnable r) { return null; } }, new ThreadPoolExecutor.DiscardOldestPolicy() //handler 拒绝策略 ); //其余6种方式 //一个活动线程,无边界队列,任务一个一个排序执行 ExecutorService singlePool = Executors.newSingleThreadExecutor(); //缓存线程以便于重用,缓存空了则新建线程,线程空闲60s则踢出缓存,处理大量短时任务,消耗资源少 ExecutorService cachePool = Executors.newCachedThreadPool(); //创建指定线程数的线程池,定时执行任务 Executors.newScheduledThreadPool(4); //保持nThread个活动线程,有线程退出则新建补齐,任务过多时多余的任务存阻塞队列 Executors.newFixedThreadPool(4); //并行处理任务,不保证顺序,java8才加入此创建方法 Executors.newWorkStealingPool(); //创建单线程池,定时执行任务 Executors.newSingleThreadScheduledExecutor();

最后

以上就是魁梧毛衣最近收集整理的关于ThreadPoolExecutor - 线程池ThreadPoolExecutor1 Java构建线程的方式2 线程池的7个参数3 线程池的执行流程4 线程池属性标识5 execute方法执行6 Worker方法7. 线程池的创建方式的全部内容,更多相关ThreadPoolExecutor内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(66)

评论列表共有 0 条评论

立即
投稿
返回
顶部