我是靠谱客的博主 奋斗酸奶,这篇文章主要介绍阻塞队列与线程池总结一、阻塞队列二、线程池相关,现在分享给大家,希望可以做个参考。

阻塞队列与线程池总结

  • 一、阻塞队列
    • 1.1 阻塞队列接口结构和实现类
    • 1.2 BlockingQueue核心方法
    • 1.3 阻塞队列API的使用
    • 1.4 阻塞队列之同步SynchronousQueue队列
    • 1.5 生产者消费者模式(lock实现)
    • 1.5 生产者和消费者(阻塞队列实现)
  • 二、线程池相关
    • 2.1 Callable接口
    • 2.2 线程池的架构及优势
    • 2.3 线程池的使用
    • 2.4 线程池7大参数
    • 2.5 线程池底层工作原理
    • 2.6 拒绝策略
    • 2.7 为什么不用默认创建的线程池?
    • 2.8 手写线程池(ThreadPoolExecutor)
    • 2.9 线程池配置合理线程数

一、阻塞队列

1.1 阻塞队列接口结构和实现类

阻塞队列,排队拥堵,首先它是一个队列,而一个阻塞队列在数据结构中所起的作用大致如下图所示:

在这里插入图片描述

线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素

  • 当阻塞队列是空时,从队列中获取元素的操作将会被阻塞

    • 当蛋糕店的柜子空的时候,无法从柜子里面获取蛋糕
  • 当阻塞队列是满时,从队列中添加元素的操作将会被阻塞

    • 当蛋糕店的柜子满的时候,无法继续向柜子里面添加蛋糕了

也就是说 试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其它线程往空的队列插入新的元素

同理,试图往已经满的阻塞队列中添加新元素的线程,直到其它线程往满的队列中移除一个或多个元素,或者完全清空队列后,使队列重新变得空闲起来,并后续新增

为什么要用?

在多线程领域:所谓的阻塞,在某些情况下会挂起线程(即阻塞),一旦条件满足,被挂起的线程又会自动唤醒

为什么需要BlockingQueue?

好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都帮你一手包办了

在concurrent包发布以前,在多线程环境下,我们每个程序员都必须自己取控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。

架构介绍

BlockingQueue阻塞队列是属于一个接口,底下有七个实现类

  • ArrayBlockQueue:由数组结构组成的有界阻塞队列
  • LinkedBlockingQueue:由链表结构组成的有界(但是默认大小 Integer.MAX_VALUE)的阻塞队列
    • 有界,但是界限非常大,相当于无界,可以当成无界
  • PriorityBlockQueue:支持优先级排序的无界阻塞队列
  • DelayQueue:使用优先级队列实现的延迟无界阻塞队列
  • SynchronousQueue:不存储元素的阻塞队列,也即单个元素的队列
    • 生产一个,消费一个,不存储元素,不消费不生产
  • LinkedTransferQueue:由链表结构组成的无界阻塞队列
  • LinkedBlockingDeque:由链表结构组成的双向阻塞队列

1.2 BlockingQueue核心方法

方法类型抛出异常特殊值阻塞超时
插入add(e)offer(e)put(e)offer(e,time,unit)
移除remove()poll()take()poll(time,unit)
检查element()peek()不可用不可用
性质说明
抛出异常当阻塞队列满时:在往队列中add插入元素会抛出 IIIegalStateException:Queue full
当阻塞队列空时:再往队列中remove移除元素,会抛出NoSuchException
特殊性插入方法,成功true,失败false
移除方法:成功返回出队列元素,队列没有就返回空
一直阻塞当阻塞队列满时,生产者继续往队列里put元素,队列会一直阻塞生产线程直到put数据or响应中断退出。
当阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用。
超时退出当阻塞队列满时,队里会阻塞生产者线程一定时间,超过限时后生产者线程会退出

1.3 阻塞队列API的使用

  1. 抛出异常组

    但执行add方法,向已经满的ArrayBlockingQueue中添加元素时候,会抛出 IIIegalStateException:Queue full,当阻塞队列空时:再往队列中remove移除元素,会抛出NoSuchException

    复制代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    public class BlockingQueueExceptionDemo { public static void main(String[] args) { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("b")); System.out.println(blockingQueue.add("c")); try { //抛出 java.lang.IllegalStateException: Queue full System.out.println(blockingQueue.add("XXX")); } catch (Exception e) { System.err.println(e); } System.out.println(blockingQueue.element());//a System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); try { //抛出 java.util.NoSuchElementException System.out.println(blockingQueue.remove()); } catch (Exception e) { System.err.println(e); } try { //element()相当于peek(),但element()会抛NoSuchElementException System.out.println(blockingQueue.element()); } catch (Exception e) { System.err.println(e); } } }

    执行结果:

    复制代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    true true true java.lang.IllegalStateException: Queue full a a b c java.util.NoSuchElementException java.util.NoSuchElementException
  2. 布尔类型组

    我们使用 offer的方法,添加元素时候,如果阻塞队列满了后,会返回false,否者返回true

    在poll取元素的时候,如果队列已空,那么会返回null

    复制代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    BlockingQueue blockingQueue = new ArrayBlockingQueue(3); System.out.println(blockingQueue.offer("a")); System.out.println(blockingQueue.offer("b")); System.out.println(blockingQueue.offer("c")); System.out.println(blockingQueue.offer("d")); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll());

    运行结果:

    复制代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    true true true false a b c null
  3. 阻塞队列组

    当阻塞队列满时,生产者继续往队列里put元素,队列会一直阻塞生产线程直到put数据or响应中断退出
    阻塞队列空时,消费者线程试图从队列里take元素,队列会一直阻塞消费者线程直到队列可用

    一般在消息中间件,比如RabbitMQ中会使用到,因为需要保证消息百分百不丢失,因此只有让它阻塞

    复制代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); blockingQueue.put("a"); blockingQueue.put("b"); blockingQueue.put("c"); System.out.println("================"); blockingQueue.take(); blockingQueue.take(); blockingQueue.take(); blockingQueue.take();//将被阻塞
  4. 超时退出组
    使用offer插入的时候,需要指定时间,如果2秒还没有插入,那么就放弃插入;同时取的时候也进行判断,如果2秒内取不出来,那么就返回null

    复制代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public class BlockingQueueTimeoutDemo { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); System.out.println("Offer."); System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("b", 2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("c", 2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.offer("d", 2L, TimeUnit.SECONDS)); System.out.println("Poll."); System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS)); System.out.println(blockingQueue.poll(2L, TimeUnit.SECONDS)); } }

    输出结果:

    复制代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Offer. true true true false Poll. a b c null

1.4 阻塞队列之同步SynchronousQueue队列

SynchronousQueue没有容量。与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。

下面我们测试SynchronousQueue添加元素的过程
首先我们创建了两个线程,一个线程用于生产,一个线程用于消费
生产的线程分别put了 A、B、C这三个字段

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.zb.demo; import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; public class SynchronousQueueDemo { public static void main(String[] args) { BlockingQueue<String> blockingQueue = new SynchronousQueue<>(); new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + "t put A "); blockingQueue.put("A"); System.out.println(Thread.currentThread().getName() + "t put B "); blockingQueue.put("B"); System.out.println(Thread.currentThread().getName() + "t put C "); blockingQueue.put("C"); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1").start(); new Thread(() -> { try { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } blockingQueue.take(); System.out.println(Thread.currentThread().getName() + "t take A "); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } blockingQueue.take(); System.out.println(Thread.currentThread().getName() + "t take B "); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } blockingQueue.take(); System.out.println(Thread.currentThread().getName() + "t take C "); } catch (InterruptedException e) { e.printStackTrace(); } }, "t2").start(); } }

执行结果:

复制代码
1
2
3
4
5
6
7
t1 put A t2 take A t1 put B t2 take B t1 put C t2 take C

我们从最后的运行结果可以看出,每次t1线程向队列中添加阻塞队列添加元素后,t1输入线程就会等待 t2消费线程,t2消费后,t2处于挂起状态,等待t1在存入,从而周而复始,形成 一存一取的状态

1.5 生产者消费者模式(lock实现)

一个初始值为0的变量,两个线程对其交替操作,一个加1,一个减1,来5轮
关于多线程的操作,我们需要记住下面几句

  • 线程 操作 资源类
  • 判断 干活 通知
  • 防止虚假唤醒机制

完整代码:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class ShareData { private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment() throws Exception{ // 加锁 lock.lock(); try { // 判断 while(number != 0) { // 等待不能生产 condition.await(); } // 干活 number++; System.out.println(Thread.currentThread().getName() + "t " + number); // 通知 唤醒 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock();//释放锁要放在finally中 } } public void decrement() throws Exception{ // ,加锁 lock.lock(); try { // 判断 while(number == 0) { // 等待不能消费 condition.await(); } // 干活 number--; System.out.println(Thread.currentThread().getName() + "t " + number); // 通知 唤醒 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } } public class prodConsumer_TraditionDemo { public static void main(String[] args) { ShareData shareData = new ShareData(); // 生产 new Thread(() -> { for (int i = 0; i < 5; i++) { try { shareData.increment(); } catch (Exception e) { e.printStackTrace(); } } }, "生产者").start(); // 消费 new Thread(() -> { for (int i = 0; i < 5; i++) { try { shareData.decrement(); } catch (Exception e) { e.printStackTrace(); } } }, "消费者").start(); } }

注意:防止虚假唤醒机制

但是我们对共享变量进行判断的时候,为了防止出现虚假唤醒机制,不能使用if来进行判断,而应该使用while。因为当线程被唤醒时候必须再进行一次判断。

复制代码
1
2
3
4
5
6
// 判断 while(number != 0) { // 等待不能生产 condition.await();//当线程从阻塞状态被唤醒的时候必须再进行判断 }

1.5 生产者和消费者(阻塞队列实现)

在concurrent包发布以前,在多线程环境下,我们每个程序员都必须自己去控制这些细节,尤其还要兼顾效率和线程安全,则这会给我们的程序带来不小的时间复杂度。而使用阻塞队列的话,就不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程

新版的阻塞队列版生产者和消费者,使用:volatile、CAS、atomicInteger、BlockQueue、线程交互、原子引用

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class MyResource{ private volatile boolean flag = true;//true表示生产,false表示消费 private AtomicInteger atomicInteger = new AtomicInteger(); BlockingQueue<String> blockingQueue = null; public MyResource(BlockingQueue<String> blockingQueue){ this.blockingQueue = blockingQueue; } //生产者进行生产 public void myProd() throws Exception { String data = null; boolean retValue; while(flag){ data = atomicInteger.incrementAndGet()+""; retValue = blockingQueue.offer(data, 2L, TimeUnit.SECONDS); if(retValue){ System.out.println(Thread.currentThread().getName()+"t插入队列"+data+"成功"); }else{ System.out.println(Thread.currentThread().getName()+"t插入队列"+data+"失败"); } TimeUnit.SECONDS.sleep(1); } System.out.println(Thread.currentThread().getName()+"t大老板叫停了,表示FLAG=false"); } public void myConsumer() throws Exception{ String result; while(flag){ result = blockingQueue.poll(2L, TimeUnit.SECONDS); if(result == null || "".equalsIgnoreCase(result)){ flag = false; System.out.println(Thread.currentThread().getName()+"t 超过两秒没有取到蛋糕,消费退出"); System.out.println(); System.out.println(); return ; } System.out.println(Thread.currentThread().getName()+"t 消费队列蛋糕"+result+"成功"); } } public void stop(){ this.flag = false; } } public class ProducerConsumerWithBlockingQueueDemo { public static void main(String[] args) { MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(3)); new Thread(()->{ System.out.println(Thread.currentThread().getName()+"t----生产线程启动"); try { myResource.myProd(); } catch (Exception e) { e.printStackTrace(); } },"prod").start(); new Thread(()->{ System.out.println(Thread.currentThread().getName()+"t-----消费者线程启动"); try { myResource.myConsumer(); } catch (Exception e) { e.printStackTrace(); } },"Consumer").start(); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } myResource.stop(); System.out.println("n5秒钟时间到,大老板main线程叫停,活动结束n"); } }

最后运行结果:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Consumer -----消费者线程启动 prod ----生产线程启动 prod 插入队列1成功 Consumer 消费队列蛋糕1成功 prod 插入队列2成功 Consumer 消费队列蛋糕2成功 prod 插入队列3成功 Consumer 消费队列蛋糕3成功 prod 插入队列4成功 Consumer 消费队列蛋糕4成功 prod 插入队列5成功 Consumer 消费队列蛋糕5成功 5秒钟时间到,大老板main线程叫停,活动结束 prod 大老板叫停了,表示FLAG=false Consumer 超过两秒没有取到蛋糕,消费退出

二、线程池相关

2.1 Callable接口

  1. callable接口概述

    从Java 5开始,Java提供了Callable接口,该接口怎么看都像是Runnable接口的增强版,Callable接口提供了一个call()方法可以作为线程执行体,但call()方法比run()方法功能更强大。

    • call()方法可以有返回值
    • call()方法可以声明抛出异常

问题是:Callable接口是Java 5新增的接口,而且它不是Runnable接口的子接口,所以Callable对象不能直接作为Thread的target。

  1. Future接口概述

    Java 5提供了Future接口来代表Callable接口里call()方法的返回值,并为Future接口提供了一个FutureTask实现类,该实现类实现了Future接口和Runnable接口可以作为Thread类的target。在Future接口里定义了如下几个公共方法来控制它关联的Callable任务

    • boolcan cancel(boolean maylnterruptltRunning):试图取消该Future里关联的Callable任务
    • V get():返回Callable任务里call()方法的返回值。调用该方法将导致程序阻塞,必须等到子线程结束后才会得到返回值
    • V get(long timeout,TimeUnit unit):返回Callable任务里call()方法的返回值。该方法让程序最多阻塞timeout和unit指定的时间,如果经过指定时间后Callable任务依然没有返回值, 将会抛出TimeoutExccption异常
    • boolean isCancelled():如果在Callable任务正常完成前被取消,则返回true
    • boolean isDone():妇果Callable任务已完成,则返回true

    注意:Callable接口有泛型限制,Callable接口里的泛型形参类型与call()方法返回值类型相同。

  2. 创建并启动有返回值的线程的步骤

    复制代码
    1
    2
    3
    4
    5
    6
    7
    1. 创建一个实现Callable的实现类 2. 实现call方法,将此线程需要执行的操作声明在call()中 3. 创建Callable接口实现类的对象 4. 将此Callable接口实现类的对象作为传递到FutureTask构造器中,创建FutureTask的对象 5. 将FutureTask的对象作为参数传递到Thread类的构造器中,创建Thread对象,并调用start() 6. 调用FutureTask对象的get()方法来获得子线程执行结束后的返回值

代码实例:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class MyThread implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println(Thread.currentThread().getName()+"come in callable"); return 1024; } } public class CallableDemo { public static void main(String[] args) throws Exception { FutureTask<Integer> futureTask = new FutureTask(new MyThread()); new Thread(futureTask,"AA").start(); //获取call方法的返回值 int result = futureTask.get();//该方法将导致主线程被阻塞,直到call()方法结束并返回为止 System.out.println(result); } }

2.2 线程池的架构及优势

架构说明:

Java中线程池是通过Executor框架实现的,该框架中用到了Executor,Executors(代表工具类),ExecutorService,ThreadPoolExecutor这几个类。

为什么用线程池

线程池做的主要工作就是控制运行的线程的数量,处理过程中,将任务放入到队列中,然后线程创建后,启动这些任务,如果线程数量超过了最大数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。

它的主要特点为:线程复用、控制最大并发数、管理线程

线程池的好处

  • 降低资源消耗。通过重复利用已创建的线程,降低线程创建和销毁造成的消耗
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就立即执行
  • 提高线程的可管理性。线程是稀缺资源,如果无线创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控

2.3 线程池的使用

  • Executors.newFixedThreadPool(int i) :创建一个拥有 i 个线程的线程池
    • 执行长期的任务,性能好很多
    • 创建一个定长线程池,可控制线程数最大并发数,超出的线程会在队列中等待
  • Executors.newSingleThreadExecutor():创建一个只有1个线程的单线程池
    • 一个任务一个任务执行的场景
    • 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序执行
  • Executors.newCachedThreadPool(); 创建一个可扩容的线程池
    • 执行很多短期异步的小程序或者负载教轻的服务器
    • 创建一个可缓存线程池,如果线程长度超过处理需要,可灵活回收空闲线程,如无可回收,则新建新线程
  • Executors.newScheduledThreadPool(int corePoolSize):线程池支持定时以及周期性执行任务,创建一个corePoolSize为传入参数,最大线程数为整形的最大数的线程池

使用代码:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class MyThreadPoolDemo { public static void main(String[] args) { //一池5个处理线程(用池化技术,一定要记得关闭) ExecutorService threadPool1 = Executors.newFixedThreadPool(5); //创建一个只有一个线程的线程池 ExecutorService threadPool2 = Executors.newSingleThreadExecutor(); //创建一个拥有N个线程的线程池,根据调度创建合适的线程 ExecutorService threadPool = Executors.newCachedThreadPool(); try{ //模拟10个用户来办理业务,每个用户就是一个来自外部请求线程 for(int i = 1;i<=10;i++){ threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"t 办理业务"); } }); } }catch(Exception e){ e.printStackTrace(); }finally{ threadPool.shutdown(); } } }

2.4 线程池7大参数

介绍7大参数之前先看一下三个常用的创建线程方法的底层源码:

  • newFixedThreadPool(int nThreads)
    在这里插入图片描述
    底层使用了LinkedBlockingQueue 链表阻塞队列

  • newSingleThreadExecutor()
    在这里插入图片描述

  • newCacheThreadPool()
    在这里插入图片描述
    底层用的是 SynchronousBlockingQueue阻塞队列

我们通过查看源码,发现三个常用的创建线程方法底层都是使用了ThreadPoolExecutor
ThreadPoolExecutor初始化方法的源码:(参数即为线程池的七大参数)
在这里插入图片描述

线程池在创建时的7大参数

  • corePoolSize:核心线程数,线程池中的常驻核心线程数
    • 在创建线程池后,当有请求任务来之后,就会安排池中的线程去执行请求任务,近似理解为今日当值线程
    • 当线程池中的线程数目达到corePoolSize后,就会把到达的队列放到缓存队列中
  • maximumPoolSize:线程池能够容纳同时执行的最大线程数,此值必须大于等于1
    • 相当有扩容后的线程数,这个线程池能容纳的最多线程数
  • keepAliveTime:多余的空闲线程存活时间
    • 当线程池数量超过corePoolSize时,当空闲时间达到keepAliveTime值时,多余的空闲线程会被销毁,直到只剩下corePoolSize个线程为止
    • 默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用
  • unit:keepAliveTime的单位
  • workQueue:任务队列,被提交的但未被执行的任务(类似于银行里面的候客区)
    • LinkedBlockingQueue:链表阻塞队列
    • SynchronousBlockingQueue:同步阻塞队列
  • threadFactory:表示生成线程池中工作线程的线程工厂,用于创建线程池 一般用默认即可
  • handler:拒绝策略,表示当队列满了并且工作线程大于线程池的最大线程数(maximumPoolSize3)时,如何来拒绝请求执行的Runnable的策略

2.5 线程池底层工作原理

在这里插入图片描述
在这里插入图片描述

  1. 在创建了线程池后,等待提交过来的任务请求
  2. 当调用execute()方法添加一个请求任务时,线程池会做出如下判断
    1. 如果正在运行的线程池数量小于corePoolSize,那么马上创建线程运行这个任务
    2. 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列
    3. 如果这时候队列满了,并且正在运行的线程数量还小于maximumPoolSize,那么还是创建非核心线程like运行这个任务;
    4. 如果队列满了并且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行
  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行
  4. 当一个线程无事可做操作一定的时间(keepAliveTime)时,线程池会判断:
    1. 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉
    2. 所以线程池的所有任务完成后,它会最终收缩到corePoolSize的大小

2.6 拒绝策略

等待队列也已经排满了,再也塞不下新任务了同时,线程池中的max线程也达到了,无法继续为新任务服务。这时候我们就需要拒绝策略机制合理的处理这个问题。

JDK拒绝策略:

  • AbortPolicy:默认,直接抛出RejectedExcutionException异常,阻止系统正常运行
  • DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常,如果运行任务丢失,这是一种好方案
  • CallerRunsPolicy:该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者
  • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务

以上内置拒绝策略均实现了RejectedExecutionHandler接口。

2.7 为什么不用默认创建的线程池?

问题:线程池创建的方法有:固定数的,单一的,可变的,那么在实际开发中,应该使用哪个?

答:我们一个都不用,在生产环境中是使用自己自定义的

为什么不用 Executors 中JDK提供的?

根据阿里巴巴手册规定:
在这里插入图片描述

2.8 手写线程池(ThreadPoolExecutor)

从上面我们知道,因为默认的Executors创建的线程池,底层都是使用LinkBlockingQueue作为阻塞队列的,而LinkBlockingQueue虽然是有界的,但是它的界限是 Integer.MAX_VALUE 大概有20多亿,可以相当是无界的了,因此我们要使用ThreadPoolExecutor自己手动创建线程池,然后指定阻塞队列的大小

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class MyThreadPool { public static void main(String[] args) { System.out.println(Runtime.getRuntime().availableProcessors()); ExecutorService threadPool = new ThreadPoolExecutor( 2, 5, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());//拒绝策略 try{ //模拟10个用户来办理业务,每个用户就是一个来自外部请求线程 for(int i = 1;i<=10;i++){ threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"t 办理业务"); } }); } }catch(Exception e){ e.printStackTrace(); }finally{ threadPool.shutdown(); } } }

2.9 线程池配置合理线程数

生产环境中如何配置 corePoolSize 和 maximumPoolSize
这个是根据具体业务来配置的,分为CPU密集型和IO密集型

  • CPU密集型
  • CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行
  • CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程)
  • 而在单核CPU上,无论你开几个模拟的多线程该任务都不可能得到加速,因为CPU总的运算能力就那些
  • CPU密集型任务配置尽可能少的线程数量
  • 一般公式:CPU核数 + 1个线程数
  • IO密集型

方法一:

  • 由于IO密集型任务线程并不是一直在执行任务,则可能多的线程,如 CPU核数 * 2
  • IO密集型,即该任务需要大量的IO操作,即大量的阻塞
  • 在单线程上运行IO密集型的任务会导致浪费大量的CPU运算能力花费在等待上
  • 所以IO密集型任务中使用多线程可以大大的加速程序的运行,即使在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞时间。

方法二:

  • IO密集时,大部分线程都被阻塞,故需要多配置线程数:
  • 参考公式:CPU核数 / (1 - 阻塞系数) 阻塞系数在0.8 ~ 0.9左右
  • 例如:8核CPU:8/ (1 - 0.9) = 80个线程数

最后

以上就是奋斗酸奶最近收集整理的关于阻塞队列与线程池总结一、阻塞队列二、线程池相关的全部内容,更多相关阻塞队列与线程池总结一、阻塞队列二、线程池相关内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(85)

评论列表共有 0 条评论

立即
投稿
返回
顶部