我是靠谱客的博主 坚强红酒,这篇文章主要介绍看了这篇文章,还不理解线程池执行流程,过来找我要钱,现在分享给大家,希望可以做个参考。

目录

    • 为什么使用线程池
    • 线程池创建
    • 核心参数
    • 提交任务流程
    • 任务什么时候执行

为什么使用线程池

实际编程中,频繁创建和销毁线程开销很大,所以一般使用线程的方式是线程池。

很方便的,java给我们提供了现成的线程池创建函数ThreadPoolExecutor,这个创建函数也成了不少公司面试必考题,当然,要想彻底理清线程池执行过程,需要剖析源码,这里我们就来仔细分析分析。

线程池创建

首先是线程池创建函数。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

可以把线程池类比成一个小公司,公司有少量正式员工,执行平时的一些工作量。如果工作量太大,正式员工忙不过来,会雇佣部分外包人员。
原则上,如果任务量突然变多,先把任务临时缓存起来,等正式员工有空闲时,交由正式员工由处理(节约成本嘛)。如果缓存队列满了,这时候就要考虑找外包人员了。
公司总预算有限,所以正式员工数量是固定的,且雇佣的外包人员也有最大人数限制。
如果工作量变少,为了节约成本,就要释放部分外包人员。
如果工作量实在太大了,正式员工、外包人员也达到最大预算人数,且所有人都在拼命完成工作任务,这时候,就要拒绝一部分任务了。

核心参数

接下来这几个参数就好理解了。

  • corePoolSize: 核心线程数量,可以类比正式员工数量,常驻线程数量。
  • maximumPoolSize: 最大的线程数量,公司最多雇佣员工数量(包含外包人员)。常驻+临时线程数量。
  • workQueue:任务等待队列,所有的正式员工都在处理任务,再来任务就先放到队列吧,队列如果也满了,那就要找外包了。
  • keepAliveTime:非核心线程空闲时间,就是外包人员等了多久,如果还没有活干,就被解雇了。
  • threadFactory: 创建线程的工厂,在这个地方可以统一处理创建的线程的属性。比如每个员工的名字,工号不一致,方便区分和安排任务。
  • handler:线程池拒绝策,什么意思呢?就是当任务实在是太多,人也不够,需求池也排满了,还有任务咋办?默认是不处理,抛出异常告诉任务提交者,我这忙不过来了。

提交任务流程

我们使用线程池的时候,一般是直接调用execute,提交一个任务,对应到线程池里是怎么处理的呢?

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { //关键步骤1 if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { //关键步骤2 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) //关键步骤3 reject(command); }

首先得理解这个ctl是什么意思?

线程池里为了充分利用int型的每一位,使用一个AtomicInteger的ctl来记录线程池中线程的数量及当前线程池的状态。低29bit位表示线程池中线程的数量,高3bit位用来记录线程池的状态是RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED中的一种。

添加任务步骤

  • 获取线程池中线程数量。

  • 线程数量小于核心线程数吗。

    • 小于核心线程数,添加一个核心线程,添加成功的话直接返回。
    • 如果添加核心线程失败,因为是多线程同时执行,再获取一遍线程池中线程数量,继续下一步。
  • 线程数量大于等于核心线程数或上面添加核心线程失败。

    • 线程池还在运行且添加任务到任务队列成功。
      • 重新检查线程池是否还在运行
        • 线程池不在运行,且从任务队列删除任务成功,拒绝该任务。
        • 线程池在运行,但线程池中没有线程(核心线程数也可以设置成0),添加一个非核心线程。
      • 线程池不在运行或添加任务到任务队列失败
        • 尝试添加非核心线程去处理该任务。
        • 如果添加非核心线程失败,拒绝该任务。

整个过程有点绕,可以对比图形,再理解一遍。

在这里插入图片描述

添加任务最关键的函数就是addWorker。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

要理解这个问题,首先要知道java里的标签是怎么用的,有点类似goto的意思,可以参考:https://blog.csdn.net/chanllenge/article/details/90266538

addWorker的核心思想是

  • 添加线程时时,区分核心和非核心线程,并可指定该线程的第一个处理任务。
  • 判断线程池状态及工作队列,线程数量等参数的合法性。
  • 通过CAS自旋方式,增加线程数量。
  • 加悲观锁,并结合参数合法性,添加一个线程worker,到线程队列workers。
  • 线程添加成功且正常启动,返回true,其他情况,添加线程失败,移除该线程,并线程数量减1,返回false。

线程池中,通过一个set来存储所有的线程。

复制代码
1
2
3
4
5
6
/** * Set containing all worker threads in pool. Accessed only when * holding mainLock. */ private final HashSet<Worker> workers = new HashSet<Worker>();

任务什么时候执行

线程添加成功之后,该线程首先会执行指定的第一个处理任务,然后从工作队列的队首依次取任务去执行。

源代码:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

看到这里,线程池的创建和执行你理解了吗?欢迎留言讨论!

@博乐

最后

以上就是坚强红酒最近收集整理的关于看了这篇文章,还不理解线程池执行流程,过来找我要钱的全部内容,更多相关看了这篇文章内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(92)

评论列表共有 0 条评论

立即
投稿
返回
顶部