我是靠谱客的博主 干净山水,这篇文章主要介绍Java基础——BlockingQueue源码分析之ArrayBlockingQueue,现在分享给大家,希望可以做个参考。

BlockingQueue是什么

  • BlockingQueue是一个阻塞队列的接口
  • BlockingQueue是线程安全的
  • BlockingQueue具有先进先出的特点
  • 当队列满的时候进行入队操作会阻塞,当队列空的时候进行出队操作会阻塞

BlockingQueue提供的接口

BlockingQueue提供的接口有四种不同的方法,具体如下表所示

操作Throws ExceptionSpecial ValueBlocksTimes Out
插入add(o)offer(o)put(o)offer(o, timeout, timeunit)
删除remove(o)poll()take()poll(timeout, timeunit)
查询element()peek()--

这四种不同的方法对应的特点分别是

  1. ThrowsException:如果操作不能马上进行,则抛出异常
  2. SpecialValue:如果操作不能马上进行,将会返回一个特殊的值,一般是true或者false
  3. Blocks:如果操作不能马上进行,操作会被阻塞
  4. TimesOut:如果操作不能马上进行,操作会被阻塞指定的时间,如果指定时间没执行,则返回一个特殊值

ArrayBlockingQueue是什么

  • ArrayBlockingQueue是一个基于数组且有界的阻塞队列,即容量固定,不存在扩容
  • ArrayBlockingQueue是线程安全的
  • ArrayBlockingQueuee具有先进先出的特点
  • 当队列满的时候进行入队(put)操作会阻塞,当队列空的时候进行出队(take)操作会阻塞
复制代码
1
2
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable

ArrayBlockingQueue成员变量

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
final Object[] items; int takeIndex; int putIndex; int count; /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;

由于ArrayBlockingQueue是基于数组且有界的阻塞队列,因此需要个数组进行存储,需要两个指针索引记录队列的头地址和尾地址。其中有个锁是为了保证线程安全的,而两个Condition则是用来阻塞队列的

ArrayBlockingQueue构造方法

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }

构造函数的参数

  1. capacity:指定容量的大小
  2. fair:指定是否使用公平锁,所谓的公平锁就是先等待的线程先获得锁

ArrayBlockingQueue的存储

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void put(E e) throws InterruptedException { Objects.requireNonNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly();//获取锁 try { while (count == items.length)//如果已满,等待 notFull.await(); enqueue(e);//如果没满,入队 } finally { lock.unlock();//释放锁 } } private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal();//通知有人入队,notEmpty表示队列不会为空 }

存储的逻辑还是挺简单的

  1. 通过对put方法加锁保证线程的安全
  2. 判断队列的情况,如果满了则等待,没满则入队
  3. 入队后,通知队列的notEmpty,表示队列不会为空

ArrayBlockingQueue的获取

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly();//获取锁 try { while (count == 0)//如果没元素,等待 notEmpty.await(); return dequeue();//如果有元素,出队 } finally { lock.unlock();//释放锁 } } private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal();//通知有人出队,notFull表示队列不会溢满 return x; }

获取的逻辑基本和存储的逻辑反过来

  1. 通过对take方法加锁保证线程的安全
  2. 判断队列的情况,如果没元素则等待,如果有元素则出队
  3. 出队后,通知队列的notFull,表示队列不会溢满

这里穿插下ReentrantLock的lock和lockInterruptibly的区别

  1. 当线程有可能被其他线程中断时,lock方法会忽略中断请求,继续获取锁直到成功
  2. 当线程有可能被其他线程中断时,lockInterruptibly方法则直接抛出中断异常来立即响应中断

总结

  1. ArrayBlockingQueue队列是基于数组和Condition类来实现的
  2. ArrayBlockingQueue的存储和获取采用生产消费模式
  3. ArrayBlockingQueue的线程安全是通过ReentrantLock来保证的
  4. ArrayBlockingQueue的队列中不允许元素为null

最后

以上就是干净山水最近收集整理的关于Java基础——BlockingQueue源码分析之ArrayBlockingQueue的全部内容,更多相关Java基础——BlockingQueue源码分析之ArrayBlockingQueue内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(66)

评论列表共有 0 条评论

立即
投稿
返回
顶部