概述
一,关联源码链接
* 并发编程(四):AbstractQueuedSynchronizer源码分析
* 并发编程(五):AQS之重入锁ReentrantLock
* 并发编程(七):AQS之Condition
二,ArrayBlockingQueue 概述
1,ArrayBlockingQueue
ArrayBlockingQueue 是通过数组实现的FIFO(先进先出)有界阻塞队列,数组长度在初始化时指定并固定,不存在数组扩容。ArrayBlockingQueue 内部通过重入锁 ReentrantLock 进行线程加锁,保证数据原子性。阻塞及释放通过两个 Condition 构成 notEmpty 和 notFull 两个实例,通过 Condition 线程通信实现加数据和取数据的线程阻塞及线程唤醒。ArrayBlockingQueue 元素获取基于指针操作,内部维护 putIndex 和 takeIndex 两个指针,对于移除元素操作和添加元素元素都由着两个指针控制操作索引,并顺序后移,具体源码分析;
2,类图
3,常用API
// 初始化,通过构造器重载处理
public ArrayBlockingQueue(int capacity);
ArrayBlockingQueue(int capacity, boolean fair);
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c);
// 取元素:
// 获取当前元素,不阻塞,不移动指针
public E peek();
// 获取当前元素,队列为空返回null,队列不为空返回元素,获取后移除
public E poll();
// 获取当前元素,队列为空阻塞,不为空返回元素,获取后移除
public E take() throws InterruptedException;
// 写元素:
// 添加元素到队列,队列未满插入成功,队列满了直接抛异常
public boolean add(E e);
// 添加元素到队列,队列未满插入成功,队列满了插入失败,返回true/false
public boolean offer(E e);
// 添加元素到队列,队列未满插入成功,队列满了阻塞插入
public void put(E e) throws InterruptedException;
// 移除元素
public boolean remove(Object o);
public boolean removeAll(Collection<?> c);
// 迭代器部分
// 构建迭代
public Iterator<E> iterator();
// 迭代_是否存在下一个
public boolean hasNext();
// 迭代_获取下一个
public E next();
// 迭代_移除
public void remove();
4,功能DEMO
* 此处只是简单演示元素插入和元素获取,以及元素插入和获取关联影响到的阻塞处理
package com.asdc.mtTest_1;
import javax.swing.*;
import java.util.concurrent.ArrayBlockingQueue;
/**
* @author LiYanBin
* @create 2019-10-23 17:57
**/
public class ArrayBlockTest {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1);
new Thread(() -> {
try {
arrayBlockingQueue.add("add方式");
System.out.println("获取不移除:" + arrayBlockingQueue.peek());
System.out.println("获取并移除:" + arrayBlockingQueue.poll());
System.out.println("阻塞获取前:" + System.currentTimeMillis());
System.out.println("阻塞获取:" + arrayBlockingQueue.take());
System.out.println("阻塞获取后:" + System.currentTimeMillis());
// 沉睡足够时间,等待线程2阻塞插入等待后,消费一次,让阻塞插入成功
Thread.sleep(5000);
System.out.println("获取元素,释放阻塞插入:" + arrayBlockingQueue.take());
System.out.println("获取阻塞插入元素:" + arrayBlockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(3000);
new Thread(() -> {
try {
System.out.println("添加元素,满后抛异常:" + arrayBlockingQueue.add("add方式"));
System.out.println("add元素被阻塞获取获取后并释放,则offer为true");
System.out.println("添加元素,未满返回true,满后返回false:" + arrayBlockingQueue.offer("offer方式"));
System.out.println("阻塞插入前," + System.currentTimeMillis());
arrayBlockingQueue.put("put方式");
System.out.println("阻塞插入后," + System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
二,锁操作及指针操作
1,锁操作
1.1,加锁处理:ReentrantLock
* ArrayBlockingQueue 通过重入锁保证原子性操作。在每一次数组元素操作时,都进行加锁和释放锁处理
final ReentrantLock lock;
1.2,线程通信:Condition
* ArrayBlockingQueue 内部定义了两个 Condition 实例,从下面名称可以看出,分别为 notEmpty 和 notFull。在对数组元素进行操作时,读元素或者写写元素成功后,会默认出发一次 notEmpty/notFull.signal(),唤醒一个等待线程进行元素插入或者元素消费。同样,在读元素或者取元素时,如果数据已满或者数据为空,则会触发 notEmpty/notFull.await() 进行等待,等待元素或者元素消费后进行线程唤醒
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
2,指针操作
* ArrayBlockingQueue 虽然底层也是数组形式,但是和 ArrayList 的元素读取还是有所区别。ArrayBlockingQueue 内部元素排列也是连续性的,但是这个连续性是对于一段下标区间的。元素的插入和移除相对应的会对 takeIndex 和 putIndex 指针进行递增操作,而下一次插入或者移除继续从指针位置开始。比如,对于一个空 ArrayBlockingQueue 队列来讲,第一次插入元素默认 putIndex 为0,则添加到0下标位置,同时 putIndex++。此时如果存在线程进行数据移除,而 takeIndex 同样为0,则移除0下标的元素,同时 takeIndex++。这时候继续添加元素,putIndex 已经修改为1,则添加元素到1下标位置,后续操作相同。这时候数组形态就是0下标为null,1下标有元素,后续皆为null。所以元素读写是一个连续的过程,而元素移除后,同样会对后续部分整体前移,并修改读写指针;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
三,源码分析
1,读源码分析
1.1,peek():获取当前指针元素,不阻塞,不移动指针,存在返回元素,不存在返回null
* peek()
public E peek() {
// 加锁释放锁操作
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 根据读索引获取索引位置元素
return itemAt(takeIndex);
} finally {
lock.unlock();
}
}
* itemAt(int i):返回位置元素,不存在则为null
final E itemAt(int i) {
return (E) items[i];
}
1.2,poll():获取当前指针元素,数组为空返回null,数组不为空返回当前指针元素并移除该元素,指针后移
* poll()
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 数组为空返回null,不为空获取元素
// dequeue:为获取数据底层方法
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
* dequeue():元素读取底层方法
private E dequeue() {
final Object[] items = this.items;
// 从数据中获取读指针下标元素
E x = (E) items[takeIndex];
// 获取并移除在此处体现,读取到后直接置null
items[takeIndex] = null;
// 读指针获取完成后,直接递增,如果已经指向最后一个下标,则回到第一个下标
if (++takeIndex == items.length)
takeIndex = 0;
// 数据元素数量递减
count--;
// 此处迭代器操作,后续迭代器分析
if (itrs != null)
itrs.elementDequeued();
// 元素读取并移除后,元素必定未满,唤醒notFull线程,允许元素插入
notFull.signal();
// 返回元素
return x;
}
1.3,take():获取当前指针元素,数组为空阻塞,数组不为空获取当前指针元素并移除该元素,指针后移
* take()
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// take方法为空阻塞在此处阻塞,数组元素数量为空,notEmpty线程等待
// 等待数组存在数据插入时,会唤醒该方法
while (count == 0)
notEmpty.await();
// 取数据
return dequeue();
} finally {
lock.unlock();
}
}
2,写源码分析
2.1,add(E e):添加元素,数组满了抛异常,数组未满添加到指针位置
* add()
public boolean add(E e) {
// 向上调用父类方法
return super.add(e);
}
// java.util.AbstractQueue#add
public boolean add(E e) {
// 内部调用offer方法,插入成功返回true
// 插入失败直接抛异常
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
2.2,offer(E e):添加元素,数据满了返回false,数组未满返回true
* offer(E e)
public boolean offer(E e) {
// null值校验,此处为null直接抛空指针异常
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 数组已满,返回false
if (count == items.length)
return false;
else {
// 数据未满,写数据
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
* enqueue(E x):元素入列,此为底层方法
private void enqueue(E x) {
// 获取元素,并添加元素到写指针位置
final Object[] items = this.items;
items[putIndex] = x;
// 写指针到末尾,直接置为头索引,并底层
if (++putIndex == items.length)
putIndex = 0;
// 数组元素数量递增
count++;
// 添加成功后,数组元素不为空,唤醒等待读锁
notEmpty.signal();
}
2.3,put(E e):添加元素,数组满了阻塞,数据未满添加成功
* put(E e)
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 数组已满,则进行等待,等待读数据后唤醒
while (count == items.length)
notFull.await();
// 添加元素
enqueue(e);
} finally {
lock.unlock();
}
}
3,移除源码分析
3.1,remove(Object o):移除元素,移除成功返回true,失败返回false
* remove(Object o)
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 数组存在元素,进行移除操作
if (count > 0) {
// 从读指针取到写指针,也就是取完完整的数据段
final int putIndex = this.putIndex;
int i = takeIndex;
do {
// 遍历到移除数据,直接进行移除操作
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
// 数组不存在元素,直接返回false
return false;
} finally {
lock.unlock();
}
}
* removeAt(final int removeIndex)
void removeAt(final int removeIndex) {
final Object[] items = this.items;
// 删除下标等于读指针,直接把该下标元素置空,并递增读指针
if (removeIndex == takeIndex) {
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
// 元素数量递减
count--;
if (itrs != null)
itrs.elementDequeued();
// 删除下标元素不等于读指针,说明删除的是数据段内部节点
} else {
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
// 首先获取移除元素下标的下一个下标
int next = i + 1;
if (next == items.length)
next = 0;
// 下一个下标位置不等于写指针位置,也就是说明还在数据段内部
if (next != putIndex) {
// 用当前元素覆盖删除元素,并循环处理,此处目的是将后续元素统一前移一位,
items[i] = items[next];
i = next;
} else {
// 所有元素前移完成后,原最后一个元素置空,并将写指针置位该位置
// 因为当前元素已经前移一位
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
3.2,removeAll(Collection<?> c):移除元素组,移除成功返回true,失败返回false
* removeAll(Collection<?> c):父类方法,内部通过迭代循环移除,迭代部分分析
public boolean removeAll(Collection<?> c) {
Objects.requireNonNull(c);
boolean modified = false;
// 迭代后循环移除
Iterator<?> it = iterator();
while (it.hasNext()) {
if (c.contains(it.next())) {
it.remove();
modified = true;
}
}
return modified;
}
4,迭代器源码分析,没分析清,网上资料也比较少,参考少,后续再补充,只能说比较麻烦,涉及参数和状态转变太多
4.1,相关元素
Itr:
// 迭代器下一次迭代的坐标,NONE表示没有
private int cursor;
// 迭代的下一个元素
private E nextItem;
// 迭代的下一个元素下标,null表示-1,被移除表示-2
private int nextIndex;
// 迭代上一个元素
private E lastItem;
// 迭代上一个元素下标,null表示-1,被移除表示-2
private int lastRet;
// 记录上次迭代的下标,迭代器失效时值为 DETACHED
private int prevTakeIndex;
// 记录上次循环次数,和cycles进行对比,可知变化
private int prevCycles;
// 为null时表示
private static final int NONE = -1;
// 元素被调用remove移除
private static final int REMOVED = -2;
// 迭代器过期
private static final int DETACHED = -3;
Itrs:
// 数组列表循环次数,每一次takeIndex到0时加1
int cycles = 0;
// 头结点,该链表后续添加节点为头结点
private Node head;
// 清理失效节点的标识节点
private Node sweeper = null;
// 清理的循环次数
private static final int SHORT_SWEEP_PROBES = 4;
private static final int LONG_SWEEP_PROBES = 16;
4.2,Itr():迭代器初始化
Itr() {
// assert lock.getHoldCount() == 0;
lastRet = NONE;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
// 数组元素为空,进行参数初始化
if (count == 0) {
cursor = NONE;
nextIndex = NONE;
prevTakeIndex = DETACHED;
} else {
// 当前读指针位置
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
// 赋值上一次的读指针,该指针位置内部定义,有Itr内部控制
prevTakeIndex = takeIndex;
// 赋值下一个节点元素,初始化时赋值为第一个
nextItem = itemAt(nextIndex = takeIndex);
// 获取下一次迭代坐标,有下一个元素为下一个元素索引,没有返回-1
cursor = incCursor(takeIndex);
// 判断itrs是否已经初始化,未初始化则初始化,初始化则注册
// 此处设计思想有点类似与ThreadLocal
// 每一个 AarrayBlockingQueue 可以初始化多个迭代器,每一个迭代器即Itr在Itrs中以Node的形式存在,
// Itr被包装为弱引用,在GC时如果没有强引用对象关联,则会被回收掉
if (itrs == null) {
itrs = new Itrs(this);
} else {
itrs.register(this);
// 注册完成后对Itrs内的Node节点做清理
itrs.doSomeSweeping(false);
}
prevCycles = itrs.cycles;
}
} finally {
lock.unlock();
}
}
* incCursor(int index):获取下一个迭代位置
private int incCursor(int index) {
// 默认迭代位置是当前索引的下一个位置
// 如果当前索引位置与写指针位置重复,即没有后续节点,直接返回-1
if (++index == items.length)
index = 0;
if (index == putIndex)
index = NONE;
return index;
}
* register(initial):初始化 Itrs 及注册当前迭代器 Itr
// 初始化Itrs
Itrs(Itr initial) {
register(initial);
}
// 注册迭代器到Itrs
void register(Itr itr) {
// 直接构造为头结点,其他节点依次挂next节点
// 该链表会把后续的节点挂在上游节点,即后来居上
head = new Node(itr, head);
}
* doSomeSweeping(boolean tryHarder):Itrs 数据清理,如果存在清理标识,从标识清理;不存在,在全链表清理
void doSomeSweeping(boolean tryHarder) {
// 此处获取清理次数
int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
// o 节点表示 p 节点的上一个节点
Node o, p;
// 获取清理节点表示
final Node sweeper = this.sweeper;
// 根据sweeper判断的标志位
boolean passedGo;
// sweeper 为空,则全表扫描清理,o赋null值,p赋头值
// sweeper 不为空,非全表扫描,o赋sweep值,p表示下一个节点
if (sweeper == null) {
o = null;
p = head;
passedGo = true;
} else {
o = sweeper;
p = o.next;
passedGo = false;
}
// 按照指定的循环次数循环
for (; probes > 0; probes--) {
// p节点为空,表示头结点或者o节点即清理节点的下一个节点为空
if (p == null) {
// passedGo为true,表示p为头节点,头结点为空,直接跳出循环
if (passedGo)
break;
// 表示p为清理节点的下一个节点,为空重置头头结点
o = null;
p = head;
passedGo = true;
}
// 获取节点处理的Itr迭代器,及下一个节点
final Itr it = p.get();
final Node next = p.next;
// 如果迭代器为null(GC)回收
// 或者迭代器已经过期(存在数据操作)进行数据清理
if (it == null || it.isDetached()) {
// 重置次数为最大次数
probes = LONG_SWEEP_PROBES;
// 清理p节点,并重新关联o节点,
p.clear();
p.next = null;
if (o == null) {
head = next;
// 如果不存在o节点也不存在下一个节点,则Itrs内没有数据,直接置空
if (next == null) {
itrs = null;
return;
}
}
else
o.next = next;
} else {
// 节点未失效,继续向下寻找
o = p;
}
p = next;
}
// 定义清理标识位置
this.sweeper = (p == null) ? null : o;
}
* isDetached():迭代器过期判断
boolean isDetached() {
// prevTakeIndex小于0,说明线程已经对数据进行变更,并将该迭代器的状态置位过期
return prevTakeIndex < 0;
}
4.3,elementDequeued():takeIndex变更引起的变更处理
* elementDequeued():取数据触发
void elementDequeued() {
// 队列中元素为空,进行空处理
if (count == 0)
queueIsEmpty();
// 读指针为0,说明已经走过一次循环
else if (takeIndex == 0)
takeIndexWrapped();
}
* queueIsEmpty()
void queueIsEmpty() {
// 对Itrs进行置null
// Itrs内部包装节点Node进行清空并关闭
for (Node p = head; p != null; p = p.next) {
Itr it = p.get();
if (it != null) {
p.clear();
it.shutdown();
}
}
head = null;
itrs = null;
}
* takeIndexWrapped()
// java.util.concurrent.ArrayBlockingQueue.Itrs#takeIndexWrapped
void takeIndexWrapped() {
cycles++;
for (Node o = null, p = head; p != null;) {
final Itr it = p.get();
final Node next = p.next;
// 进行过期判断及处理,此处设置获取,取数据时如果满足重置条件会重置
if (it == null || it.takeIndexWrapped()) {
p.clear();
p.next = null;
if (o == null)
head = next;
else
o.next = next;
} else {
o = p;
}
p = next;
}
if (head == null) // no more iterators to track
itrs = null;
}
4.4,hasNext():判断下一个元素
* hasNext()
public boolean hasNext() {
// Itr初始化时,已经对nextItem进行赋值,如果不为空,说明存在,返回true
if (nextItem != null)
return true;
// 下一个节点不存在处理
noNext();
return false;
}
* noNext()
private void noNext() {
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
// 判断当前迭代器是否已经过期
// 没有过期继续操作
if (!isDetached()) {
// 判断元素是否发生移动,移动后判断是否过期及对应参数重新赋值
incorporateDequeues();
if (lastRet >= 0) {
// 获取上一个迭代元素,并失效当前Itr
lastItem = itemAt(lastRet);
detach();
}
}
} finally {
lock.unlock();
}
}
* incorporateDequeues()
private void incorporateDequeues() {
// 当前循环次数
final int cycles = itrs.cycles;
// 读指针索引,当前读指针索引
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
// 上一次循环次数
final int prevCycles = this.prevCycles;
// 迭代时指针索引,指创建迭代器时候的初始索引,后续可能会被变更
final int prevTakeIndex = this.prevTakeIndex;
// 如果循环次数不一致或者指针索引不一致,说明发生了数据偏移
if (cycles != prevCycles || takeIndex != prevTakeIndex) {
final int len = items.length;
// 获取数据偏移量
long dequeues = (cycles - prevCycles) * len
+ (takeIndex - prevTakeIndex);
// 判断上一个获取节点是否需要失效
if (invalidated(lastRet, prevTakeIndex, dequeues, len))
lastRet = REMOVED;
// 判断下一个节点是否需要失效
if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
nextIndex = REMOVED;
// 判断是否存在下一个迭代元素
if (invalidated(cursor, prevTakeIndex, dequeues, len))
cursor = takeIndex;
// 需要失效处理,则失效
if (cursor < 0 && nextIndex < 0 && lastRet < 0)
detach();
// 不失效,则重置上一次值为当前值
else {
this.prevCycles = cycles;
this.prevTakeIndex = takeIndex;
}
}
}
* invalidated(int index, int prevTakeIndex, long dequeues, int length):失效算法判断,目前没搞懂
private boolean invalidated(int index, int prevTakeIndex, long dequeues, int length) {
if (index < 0)
return false;
int distance = index - prevTakeIndex;
if (distance < 0)
distance += length;
return dequeues > distance;
}
4.5,next():获取下一个元素
public E next() {
// 迭代器初始化后,如果此时将队列清空,因为nextItem已经初始化,此时依旧会返回初始化迭代时候的第一个元素
// 下一个元素不存在,直接空指针
final E x = nextItem;
if (x == null)
throw new NoSuchElementException();
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
// 迭代器未失效,进行数据偏移矫正或者直接失效
if (!isDetached())
incorporateDequeues();
// 重置上一个节点及下一个节点的下标和元素
lastRet = nextIndex;
final int cursor = this.cursor;
// 存在下一个坐标
if (cursor >= 0) {
nextItem = itemAt(nextIndex = cursor);
// 下一次迭代坐标递增
this.cursor = incCursor(cursor);
// 不存在下一个坐标,直接对相关元素置空
} else {
nextIndex = NONE;
nextItem = null;
}
} finally {
lock.unlock();
}
return x;
}
4.6,remove():移除当前元素
* remove()
public void remove() {
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
// 如果没有过期,进行元素位置矫正
if (!isDetached())
incorporateDequeues();
// 获取上一个获取的元素坐标
// Itr初始化时候,lastRet已经初始化为-1
// 调用next():会对lastRet进行重赋值,赋当时的next值
final int lastRet = this.lastRet;
this.lastRet = NONE;
// lastRet >= 0,说明已经调用next初始化
if (lastRet >= 0) {
// Itr没有过期,直接移除
if (!isDetached())
// 调用ArrayBlockingQueue移除
removeAt(lastRet);
else {
// Itr已经过期,对lastItem进行处理
final E lastItem = this.lastItem;
this.lastItem = null;
if (itemAt(lastRet) == lastItem)
removeAt(lastRet);
}
// 为NONE,说明没有初始化
} else if (lastRet == NONE)
throw new IllegalStateException();
// 不存在下一个元素,直接失效
if (cursor < 0 && nextIndex < 0)
detach();
} finally {
lock.unlock();
}
}
* removedAt(int removedIndex):调用Itrs移除
// java.util.concurrent.ArrayBlockingQueue.Itrs#removedAt
void removedAt(int removedIndex) {
// 遍历Itrs中整串Itr
for (Node o = null, p = head; p != null;) {
final Itr it = p.get();
final Node next = p.next;
// Itr 为null,说明已经被GC回收,直接清空回收处理
// Itr不为空,判断是否可以移除
if (it == null || it.removedAt(removedIndex)) {
p.clear();
p.next = null;
if (o == null)
head = next;
else
o.next = next;
} else {
o = p;
}
p = next;
}
// head为null,说明已经遍历完成,则直接置空Itrs
if (head == null)
itrs = null;
}
* removedAt(int removedIndex):调用Itr判断是否可以移除,没懂,有空再分析吧
// java.util.concurrent.ArrayBlockingQueue.Itr#removedAt
boolean removedAt(int removedIndex) {
if (isDetached())
return true;
final int cycles = itrs.cycles;
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
final int prevCycles = this.prevCycles;
final int prevTakeIndex = this.prevTakeIndex;
final int len = items.length;
int cycleDiff = cycles - prevCycles;
if (removedIndex < takeIndex)
cycleDiff++;
final int removedDistance =
(cycleDiff * len) + (removedIndex - prevTakeIndex);
int cursor = this.cursor;
if (cursor >= 0) {
int x = distance(cursor, prevTakeIndex, len);
if (x == removedDistance) {
if (cursor == putIndex)
this.cursor = cursor = NONE;
}
else if (x > removedDistance) {
this.cursor = cursor = dec(cursor);
}
}
int lastRet = this.lastRet;
if (lastRet >= 0) {
int x = distance(lastRet, prevTakeIndex, len);
if (x == removedDistance)
this.lastRet = lastRet = REMOVED;
else if (x > removedDistance)
this.lastRet = lastRet = dec(lastRet);
}
int nextIndex = this.nextIndex;
if (nextIndex >= 0) {
int x = distance(nextIndex, prevTakeIndex, len);
if (x == removedDistance)
this.nextIndex = nextIndex = REMOVED;
else if (x > removedDistance)
this.nextIndex = nextIndex = dec(nextIndex);
}
else if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
this.prevTakeIndex = DETACHED;
return true;
}
return false;
}
最后
以上就是正直大碗为你收集整理的并发编程(十三):阻塞队列之ArrayBlockingQueue一,关联源码链接二,ArrayBlockingQueue 概述二,锁操作及指针操作三,源码分析的全部内容,希望文章能够帮你解决并发编程(十三):阻塞队列之ArrayBlockingQueue一,关联源码链接二,ArrayBlockingQueue 概述二,锁操作及指针操作三,源码分析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复