概述
线程池执行过程
调用execute(task)方法底层执行步骤
-
1.首先检查线程池的运行状态和工作线程数量,如果工作线程总数(从ctl变量中获取线程并统计)少于核心线程数,则会创建一个新的线程来执行给的的任务,通过调用addWorker来执行任务.
-
2.如果线程处于运行状态且工作队列能够入队新的任务,则使用double-check机制再次判断是否处于运行状态及是否能够出队任务,如果不成立,则使用拒绝策略拒绝新提交的任务,如果double-check的工作线程数量为0,则添加worker线程。
-3.如果线程池处于shutdown或饱和状态,则创建带有firstTask的worker线程失败,则拒绝新进任务。 -
4.对应部分源码如下
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); } } ````
线程池addWorker执行过程
- 1.boolean addWorker(Runnable firstTask, boolean core),检查是否可以根据当前线程池和给定的绑定添加来添加一个新的worker线程。返回true表示接收了任务,否则表示拒绝任务
- 具体任务是,获取线程池线程及整体,判断线程池工作队列是否为空,如果为空,则返回false(创建worker失败);如果队列不为空,则判断线程是否大于等于线程池容量或大于核心线程数量或最大线程数量,则返回false,否则进行线程池数量自增1,进行woker创建
- 部分源码如下,判断线程池状态及工作队列是否为空
retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
- 2.判断线程池状态、工作队列后,进行worker线程创建.首先,worker线程创建成功后,会获取worker对应的Thread。然后,获取线程池对应的重入锁资源,判断线程池工作状态,如果是处于SHUTDOWN或处于SHUTDOWN且提交任务为空,则进一步判断worker的线程状态是否存活,若存活则抛出异常;如果线程池状态处于RUUNNING且提交firstTask不为空,则将新建worker提交到对应的HashSet在,最后是否所资源
- 源码如下
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); }
- 源码如下
- 3.worker运行机制,实际执行方法,ThreadPoolExecutor#runWorker,最终执行我们最开始创建的Runnable对象的run方法。执行任务时,Java充分为各阶段预留好hook,方便自定义业务进行定制,如beforeExecute()【自定义执行run方法之前的预处理工作】,afterExecute()【执行run方法后的收尾工作】
- 源码如下
final void runWorker(Worker w) { //获取当前线程 Thread wt = Thread.currentThread(); //获取worker对象的firstTask Runnable task = w.firstTask; w.firstTask = null; //释放worker对象锁资源,运行中断,提高并发量 w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //判断任务是否为空,不空则进行锁资源获取,执行具体任务 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt //如果线程池处于停止状态,判断是否处于中断;如果未停止,则判断当前线程是否处于中断,并且需要再次检查当清理中断时处理shutdownNow race if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
- 源码如下
最后
以上就是开朗山水为你收集整理的线程池excute方法执行底层过程的全部内容,希望文章能够帮你解决线程池excute方法执行底层过程所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复