概述
java并发队列之ArrayBlockingQueue(二)
ArrayBlockingQueue是一个用数组实现的有界阻塞并发安全队列.
demo实战
public class ArrayBlockingQueueDemo {
public static void main(String[] args) {
//队列总长度为10.
final BlockingQueue<String> deque = new ArrayBlockingQueue<>(10);
Runnable producerRunnable = new Runnable() {
int i = 0;
public void run() {
while (true) {
i++;
try {
System.out.println("我生产了一个===" + i);
deque.put(i + "dddd");
Thread.sleep(100); //可以修改这里的等候时间和消费着的等候时间来看阻塞情况.
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Runnable customerRunnable = new Runnable() {
public void run() {
while (true) {
try {
System.out.println("我消费了一个===" + deque.take());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
Thread thread1 = new Thread(producerRunnable);
thread1.start();
Thread thread2 = new Thread(customerRunnable);
thread2.start();
}
}
解说: 消费者2s才能消费一个元素,而生产者则100ms生产一个.生产到第12个元素的时候总的队列长度达到了10(前面2个被消费掉了).这个时候就会阻塞.生产者就进入阻塞状态,只有等消费者消费之后才能再次压入队列.
同样的,如果把消费者的时间改为100ms,生产者的时间改为2s,则消费者发现队列里面元素为空时,则消费者进入阻塞状态.
上面的例子中有阻塞,有有界(队列长度).这就是一个有界阻塞队列.
ArrayBlockingQueue特性
- 创建ArrayBlockingQueue时必须设置队列大小.超过队列大小时则阻塞.
- 独占锁lock(默认为非公平锁)用来保证出、入队操作的原子性,这保证了同时只有一个线程可以进行入队、出队操作
- 使用生产者,消费者模式来设计的一个队列.
- 先进先出队列
源码分析
初始化
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//定义队列大小
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
//队列空的时候阻塞
notEmpty = lock.newCondition();
//队列满的时候阻塞
notFull = lock.newCondition();
}
压入队列put阻塞方法
public void put(E e) throws InterruptedException {
//队列中元素不能为空
checkNotNull(e);
final ReentrantLock lock = this.lock;
//可中断锁
lock.lockInterruptibly();
try {
//可以看到这里,当元素达到设置的总长度时,则阻塞起来;
//这也是有界队列的最好证明
while (count == items.length)
notFull.await(); //线程挂起
insert(e);
} finally {
lock.unlock();
}
}
private void insert(E x) {
//从0开始
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
//计算下标指针.每次+1.
final int inc(int i) {
return (++i == items.length) ? 0 : i;
}
压入队列offer,add非阻塞方法
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
//当队列满时,返回false不做任何操作,不阻塞.
if (count == items.length)
return false;
else {
insert(e);
return true;
}
} finally {
lock.unlock();
}
}
从put,offer方法中可以看出,一个是阻塞的一个是非阻塞的.
个人观点,如果我们在使用ArrayBlockingQueue的时候,压入队列元素,使用offer方法而不是使用put方法,那是否可以说ArrayBlockingQueue是一个非阻塞队列呢.我的答案:no也对,yes也对. 阻塞非阻塞,取决于你是否使用阻塞方法,如果你没有使用阻塞方法那就不阻塞了.当然offer什么也不做直接返回false,说明在使用offer方法时ArrayBlockingQueue只是一个普通的有界队列,也就失去了阻塞队列的意义了但以此同时也没有非阻塞队列的可以无限增加元素的功能.如果不使用put方法的ArrayBlockingQueue队列就是一个废弃物.
弹出队列take
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果队列为空则阻塞,线程进入等待状态
while (count == 0)
notEmpty.await();
//弹出一个元素
return extract();
} finally {
lock.unlock();
}
}
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null; //弹出元素
takeIndex = inc(takeIndex); //遍历指针往下移一位
--count; //总数-1
notFull.signal(); //解锁put中的await
return x;
}
ArrayBlockingQueue通过使用全局独占锁实现同时只能有一个线程进行入队或者出队操作,这个锁的粒度比较大,有点类似在方法上添加synchronized的意味。其中offer,poll操作通过简单的加锁进行入队出队操作,而put,take则使用了条件变量实现如果队列满则等待,如果队列空则等待,然后分别在出队和入队操作中发送信号激活等待线程实现同步。另外相比LinkedBlockingQueue,ArrayBlockingQueue的size操作的结果是精确的,因为计算前加了全局锁。
最后
以上就是清秀棒棒糖为你收集整理的java 并发队列之ArrayBlockingQueue(二)java并发队列之ArrayBlockingQueue(二)的全部内容,希望文章能够帮你解决java 并发队列之ArrayBlockingQueue(二)java并发队列之ArrayBlockingQueue(二)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复