我是靠谱客的博主 甜蜜钢笔,这篇文章主要介绍线程及线程池技术原理分析,现在分享给大家,希望可以做个参考。

线程

线程是调度CPU资源的最小单位。java线程与OS线程保持1:1的映射关系,也就是一个java线程会对应操作系统中的一个线程

jdk中定义了线程的6种状态:

NEW,RUNNABLE,BLOCKED,WAITING,TIMED_WAITING,TERMINATED

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public enum State { /** * Thread state for a thread which has not yet started. */ NEW, /** * Thread state for a runnable thread. A thread in the runnable * state is executing in the Java virtual machine but it may * be waiting for other resources from the operating system * such as processor. */ RUNNABLE, /** * Thread state for a thread blocked waiting for a monitor lock. * A thread in the blocked state is waiting for a monitor lock * to enter a synchronized block/method or * reenter a synchronized block/method after calling * {@link Object#wait() Object.wait}. */ BLOCKED, /** * Thread state for a waiting thread. * A thread is in the waiting state due to calling one of the * following methods: * <ul> * <li>{@link Object#wait() Object.wait} with no timeout</li> * <li>{@link #join() Thread.join} with no timeout</li> * <li>{@link LockSupport#park() LockSupport.park}</li> * </ul> * * <p>A thread in the waiting state is waiting for another thread to * perform a particular action. * * For example, a thread that has called <tt>Object.wait()</tt> * on an object is waiting for another thread to call * <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on * that object. A thread that has called <tt>Thread.join()</tt> * is waiting for a specified thread to terminate. */ WAITING, /** * Thread state for a waiting thread with a specified waiting time. * A thread is in the timed waiting state due to calling one of * the following methods with a specified positive waiting time: * <ul> * <li>{@link #sleep Thread.sleep}</li> * <li>{@link Object#wait(long) Object.wait} with timeout</li> * <li>{@link #join(long) Thread.join} with timeout</li> * <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li> * <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li> * </ul> */ TIMED_WAITING, /** * Thread state for a terminated thread. * The thread has completed execution. */ TERMINATED; }

根据jdk Thread State状态及注释,总结线程各种状态的变化:

协程 

协程即用户级线程,目的是最大化的发挥硬件性能和提升软件的速度。基本原理:在某个点挂起当前的任务,等任务完成或者到达某个条件时,再还原栈信息并继续执行(整个过程不需要上下文切换) 

线程池——ThreadPoolExecutor

线程池:可以直接理解为一个线程缓存。 

在java web开发中,如果每个请求都需要创建一个线程去执行任务,并发请求很高,但是每个请求执行时间都非常短,这样就会频繁的创建和销毁线程,将导致系统性能大大降低。

对于这种并发大任务执行时间短的场景,线程池的优势就体现出来了:

1、存活线程重用,减少线程创建和销毁的开销,提高性能

2、提高响应速度。当任务提交后,可以不用等到线程创建就能立即执行

3、提高线程的可管理性。

java线程池7个参数的理解 

int corePoolSize——核心线程数,线程池一直存活的线程数量,即使这些线程是空闲状态
int maximumPoolSize——线程池允许的最大线程数
long keepAliveTime——当线程池中线程数量超过核心线程数,多余的线程在终止前等待新任务的最长时间
TimeUnit unit——时间单位
BlockingQueue<Runnable> workQueue——存放任务的阻塞队列,只存放由execute()提交的任务
ThreadFactory threadFactory——线程池使用创建线程的线程工厂
RejectedExecutionHandler handler——拒绝策略。当线程数量和队列容量都满了,使用该handler来处理当前的任务。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

线程池状态和数量的说明:

引用ctl是一个并发安全的AtomicInteger计数器,它通过Integer的高三位和低29位维护了线程池的状态(RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED)和线程的数量, 分别通过runStateOf(int)、workerCountOf(int)和ctlOf(int,int)获取线程状态、工作线程数量和Integer的大小。

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; //高三为111 private static final int SHUTDOWN = 0 << COUNT_BITS; //高三为000 private static final int STOP = 1 << COUNT_BITS; //高三为001 private static final int TIDYING = 2 << COUNT_BITS; //高三为010 private static final int TERMINATED = 3 << COUNT_BITS; //高三为011 // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池核心方法execute()

核心代码说明及流程

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }

 代码分析——主要分为三步:

1、工作线程数量小于核心线程数,则调用addWorker(Runnable,boolean)创建Worker,并将任务传递进去,通过线程工厂类创建工作线程,启动工作线程去执行该任务。

2、如果工作线程数量大于等于核心线程数,则先判断线程池是否在RUNNING状态,是则尝试将任务丢到队列中,否则直接直接调用addWorker()尝试执行任务

3、如果队列已满(2)中将任务入队失败,则会直接使用设置的任务淘汰策略处理任务

默认有四种淘汰策略:

(1)、AbortPolicy抛异常

(2)、CallerRunsPolicy使用当前线程执行任务

(3)、DiscardOldestPolicy淘汰最老的任务,添加新任务

(4)、DiscardPolicy啥也不干

核心方法addWorker(Runnable, boolean)

主要做了两件核心的事:

1、校验通过后,使用for循环确保线程池中线程数量+1;

2、创建Worker,Worker维护了任务和使用线程工厂创建的线程,并将Worker添加到worker的set集合中维护起来,最后启动线程执行任务。

其他异常情况:线程池是STOP状态或者可以被置为SHUTDOWN状态,该方法将返回false;另外,如果线程工厂创建线程失败、返回null或者抛出异常,将会执行回滚操作,回滚操作包括将worker从集合中移除,线程数量-1。

工作线程类Worker

1、new Worker(Runnable):维护任务和使用线程工厂创建的线程,并将自己作为一个任务传递给线程。

2、当线程启动后,就会执行run()方法,并执行runWorker(this)方法。

3、runWorker(Worker):while循环获取任务,如果当前worker中维持的任务不为null,则使用工厂线程执行任务;否则firstTask为null,通过getTask()从队列中获取任务去执行。如果没有获取到任务则退出循环执行processWorkerExit()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } }

getTask()方法

getTask 方法核心代码

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }

分析:

allowCoreThreadTimeOut默认为false,只有当线程池线程数量超过核心线程数,就通过timed来标识当前线程是需要超时回收的线程。此时假设线程池线程数量超过核心线程数,调用阻塞队列的workQueue#poll(int, TimeUnit)超时方法来获取任务,如果在超时时间获取到任务,则返回任务;否则表示阻塞任务队列中已经为空了,需要将线程进行回收,执行timedOut = true表示线程需要被清理,而且线程池中存在线程并且队列是空的,通过CAS保证线程池中线程数量减1,并返回空。如果线程池中线程数量没有超过核心线程数,则调用workQueue.take()会一直阻塞在队列中(阻塞是通过条件队列调用await()实现的),直到有新任务被添加到队列中。

processWorkerExit()方法

核心代码

复制代码
1
2
3
4
5
6
try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); }

该方法在前面会处理异常情况,如果执行任务时出现异常则会将线程数量减1,否则不做处理,因为在getTask()方法中已经处理了。随后将完成的任务数+1,将当前工作者线程从工作者线程的集合中移除,这就意味着当前线程生命周期结束,将被jvm垃圾回收器回收。

线程池核心方法submit()和get()

FutureTask 

FutureTask实现了RunnableFuture,RunnableFuture分别继承了Future和Runnable。FutureTask封装了任务callable和执行状态state,state用来标识任务执行状态,在get()时用此标识来判断是否阻塞自己,并且还维护了WaitNode构建的单向链表用来记录WAITING状态的线程(CAS头插法)。

submit()方法:将任务封装为FutureTask,并调用execute(Runnable)方法执行任务。如果通过submit()提交Runnable无返回的任务,最后get()将返回null.

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }

get()方法:

复制代码
1
2
3
4
5
6
7
8
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) //构建单向链表,记录WAITING线程。 s = awaitDone(false, 0L); //被唤醒后,获取任务执行结果 return report(s); }

最后总结:

submit()和execute()的区别

submit()最终会调用execute()执行任务,并且会返回结果。 

最后

以上就是甜蜜钢笔最近收集整理的关于线程及线程池技术原理分析的全部内容,更多相关线程及线程池技术原理分析内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(70)

评论列表共有 0 条评论

立即
投稿
返回
顶部