我是靠谱客的博主 正直大碗,最近开发中收集的这篇文章主要介绍并发编程(十三):阻塞队列之ArrayBlockingQueue一,关联源码链接二,ArrayBlockingQueue 概述二,锁操作及指针操作三,源码分析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一,关联源码链接

    * 并发编程(四):AbstractQueuedSynchronizer源码分析

    * 并发编程(五):AQS之重入锁ReentrantLock

   * 并发编程(七):AQS之Condition

二,ArrayBlockingQueue 概述

1,ArrayBlockingQueue

    ArrayBlockingQueue 是通过数组实现的FIFO(先进先出)有界阻塞队列,数组长度在初始化时指定并固定,不存在数组扩容。ArrayBlockingQueue 内部通过重入锁 ReentrantLock 进行线程加锁,保证数据原子性。阻塞及释放通过两个 Condition 构成 notEmpty notFull 两个实例,通过 Condition 线程通信实现加数据和取数据的线程阻塞及线程唤醒。ArrayBlockingQueue 元素获取基于指针操作,内部维护 putIndex takeIndex 两个指针,对于移除元素操作和添加元素元素都由着两个指针控制操作索引,并顺序后移,具体源码分析;

2,类图

3,常用API

// 初始化,通过构造器重载处理
public ArrayBlockingQueue(int capacity);
ArrayBlockingQueue(int capacity, boolean fair);
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c);
// 取元素:
// 获取当前元素,不阻塞,不移动指针
public E peek();
// 获取当前元素,队列为空返回null,队列不为空返回元素,获取后移除
public E poll();
// 获取当前元素,队列为空阻塞,不为空返回元素,获取后移除
public E take() throws InterruptedException;
// 写元素:
// 添加元素到队列,队列未满插入成功,队列满了直接抛异常
public boolean add(E e);
// 添加元素到队列,队列未满插入成功,队列满了插入失败,返回true/false
public boolean offer(E e);
// 添加元素到队列,队列未满插入成功,队列满了阻塞插入
public void put(E e) throws InterruptedException;
// 移除元素
public boolean remove(Object o);
public boolean removeAll(Collection<?> c);
// 迭代器部分
// 构建迭代
public Iterator<E> iterator();
// 迭代_是否存在下一个
public boolean hasNext();
// 迭代_获取下一个
public E next();
// 迭代_移除
public void remove();

4,功能DEMO

    * 此处只是简单演示元素插入和元素获取,以及元素插入和获取关联影响到的阻塞处理

package com.asdc.mtTest_1;

import javax.swing.*;
import java.util.concurrent.ArrayBlockingQueue;

/**
 * @author LiYanBin
 * @create 2019-10-23 17:57
 **/
public class ArrayBlockTest {

    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1);
        new Thread(() -> {
            try {
                arrayBlockingQueue.add("add方式");
                System.out.println("获取不移除:" + arrayBlockingQueue.peek());
                System.out.println("获取并移除:" + arrayBlockingQueue.poll());
                System.out.println("阻塞获取前:" + System.currentTimeMillis());
                System.out.println("阻塞获取:" + arrayBlockingQueue.take());
                System.out.println("阻塞获取后:" + System.currentTimeMillis());
                // 沉睡足够时间,等待线程2阻塞插入等待后,消费一次,让阻塞插入成功
                Thread.sleep(5000);
                System.out.println("获取元素,释放阻塞插入:" + arrayBlockingQueue.take());
                System.out.println("获取阻塞插入元素:" + arrayBlockingQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        Thread.sleep(3000);
        new Thread(() -> {
            try {
                System.out.println("添加元素,满后抛异常:" + arrayBlockingQueue.add("add方式"));
                System.out.println("add元素被阻塞获取获取后并释放,则offer为true");
                System.out.println("添加元素,未满返回true,满后返回false:" + arrayBlockingQueue.offer("offer方式"));
                System.out.println("阻塞插入前," + System.currentTimeMillis());
                arrayBlockingQueue.put("put方式");
                System.out.println("阻塞插入后," + System.currentTimeMillis());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

    }

}

二,锁操作及指针操作

1,锁操作

1.1,加锁处理:ReentrantLock

    * ArrayBlockingQueue 通过重入锁保证原子性操作。在每一次数组元素操作时,都进行加锁和释放锁处理

final ReentrantLock lock;

1.2,线程通信:Condition

    * ArrayBlockingQueue 内部定义了两个 Condition 实例,从下面名称可以看出,分别为 notEmptynotFull。在对数组元素进行操作时,读元素或者写写元素成功后,会默认出发一次 notEmpty/notFull.signal(),唤醒一个等待线程进行元素插入或者元素消费。同样,在读元素或者取元素时,如果数据已满或者数据为空,则会触发 notEmpty/notFull.await() 进行等待,等待元素或者元素消费后进行线程唤醒

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

2,指针操作

    * ArrayBlockingQueue 虽然底层也是数组形式,但是和 ArrayList 的元素读取还是有所区别。ArrayBlockingQueue 内部元素排列也是连续性的,但是这个连续性是对于一段下标区间的。元素的插入和移除相对应的会对 takeIndex 和 putIndex 指针进行递增操作,而下一次插入或者移除继续从指针位置开始。比如,对于一个空 ArrayBlockingQueue 队列来讲,第一次插入元素默认 putIndex 为0,则添加到0下标位置,同时 putIndex++。此时如果存在线程进行数据移除,而 takeIndex 同样为0,则移除0下标的元素,同时 takeIndex++。这时候继续添加元素,putIndex 已经修改为1,则添加元素到1下标位置,后续操作相同。这时候数组形态就是0下标为null,1下标有元素,后续皆为null。所以元素读写是一个连续的过程,而元素移除后,同样会对后续部分整体前移,并修改读写指针;

/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

三,源码分析

1,读源码分析

1.1,peek():获取当前指针元素,不阻塞,不移动指针,存在返回元素,不存在返回null

    * peek()

public E peek() {
	// 加锁释放锁操作
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
		// 根据读索引获取索引位置元素
		return itemAt(takeIndex);
	} finally {
		lock.unlock();
	}
}

    * itemAt(int i):返回位置元素,不存在则为null

final E itemAt(int i) {
	return (E) items[i];
}

1.2,poll():获取当前指针元素,数组为空返回null,数组不为空返回当前指针元素并移除该元素,指针后移

    * poll()

public E poll() {
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
		// 数组为空返回null,不为空获取元素
		// dequeue:为获取数据底层方法
		return (count == 0) ? null : dequeue();
	} finally {
		lock.unlock();
	}
}

    * dequeue():元素读取底层方法

private E dequeue() {
	final Object[] items = this.items;
	// 从数据中获取读指针下标元素
	E x = (E) items[takeIndex];
	// 获取并移除在此处体现,读取到后直接置null
	items[takeIndex] = null;
	// 读指针获取完成后,直接递增,如果已经指向最后一个下标,则回到第一个下标
	if (++takeIndex == items.length)
		takeIndex = 0;
	// 数据元素数量递减
	count--;
	// 此处迭代器操作,后续迭代器分析
	if (itrs != null)
		itrs.elementDequeued();
	// 元素读取并移除后,元素必定未满,唤醒notFull线程,允许元素插入
	notFull.signal();
	// 返回元素
	return x;
}

1.3,take():获取当前指针元素,数组为空阻塞,数组不为空获取当前指针元素并移除该元素,指针后移

    * take()

public E take() throws InterruptedException {
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		// take方法为空阻塞在此处阻塞,数组元素数量为空,notEmpty线程等待
		// 等待数组存在数据插入时,会唤醒该方法
		while (count == 0)
			notEmpty.await();
		// 取数据
		return dequeue();
	} finally {
		lock.unlock();
	}
}

2,写源码分析

2.1,add(E e):添加元素,数组满了抛异常,数组未满添加到指针位置

    * add()

public boolean add(E e) {
	// 向上调用父类方法
	return super.add(e);
}
// java.util.AbstractQueue#add
public boolean add(E e) {
	// 内部调用offer方法,插入成功返回true
	// 插入失败直接抛异常
	if (offer(e))
		return true;
	else
		throw new IllegalStateException("Queue full");
}

2.2,offer(E e):添加元素,数据满了返回false,数组未满返回true

    * offer(E e)

public boolean offer(E e) {
	// null值校验,此处为null直接抛空指针异常
	checkNotNull(e);
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
		// 数组已满,返回false
		if (count == items.length)
			return false;
		else {
			// 数据未满,写数据
			enqueue(e);
			return true;
		}
	} finally {
		lock.unlock();
	}
}

    * enqueue(E x):元素入列,此为底层方法

private void enqueue(E x) {
	// 获取元素,并添加元素到写指针位置
	final Object[] items = this.items;
	items[putIndex] = x;
	// 写指针到末尾,直接置为头索引,并底层
	if (++putIndex == items.length)
		putIndex = 0;
	// 数组元素数量递增
	count++;
	// 添加成功后,数组元素不为空,唤醒等待读锁
	notEmpty.signal();
}

2.3,put(E e):添加元素,数组满了阻塞,数据未满添加成功

    * put(E e)

public void put(E e) throws InterruptedException {
	checkNotNull(e);
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		// 数组已满,则进行等待,等待读数据后唤醒
		while (count == items.length)
			notFull.await();
		// 添加元素
		enqueue(e);
	} finally {
		lock.unlock();
	}
}

3,移除源码分析

3.1,remove(Object o):移除元素,移除成功返回true,失败返回false

   * remove(Object o)

public boolean remove(Object o) {
	if (o == null) return false;
	final Object[] items = this.items;
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
		// 数组存在元素,进行移除操作
		if (count > 0) {
			// 从读指针取到写指针,也就是取完完整的数据段
			final int putIndex = this.putIndex;
			int i = takeIndex;
			do {
				// 遍历到移除数据,直接进行移除操作
				if (o.equals(items[i])) {
					removeAt(i);
					return true;
				}
				if (++i == items.length)
					i = 0;
			} while (i != putIndex);
		}
		// 数组不存在元素,直接返回false
		return false;
	} finally {
		lock.unlock();
	}
}

    * removeAt(final int removeIndex)

void removeAt(final int removeIndex) {
	final Object[] items = this.items;
	// 删除下标等于读指针,直接把该下标元素置空,并递增读指针
	if (removeIndex == takeIndex) {
		items[takeIndex] = null;
		if (++takeIndex == items.length)
			takeIndex = 0;
		// 元素数量递减
		count--;
		if (itrs != null)
			itrs.elementDequeued();
	// 删除下标元素不等于读指针,说明删除的是数据段内部节点
	} else {
		final int putIndex = this.putIndex;
		for (int i = removeIndex;;) {
			// 首先获取移除元素下标的下一个下标
			int next = i + 1;
			if (next == items.length)
				next = 0;
			// 下一个下标位置不等于写指针位置,也就是说明还在数据段内部
			if (next != putIndex) {
				// 用当前元素覆盖删除元素,并循环处理,此处目的是将后续元素统一前移一位,
				items[i] = items[next];
				i = next;
			} else {
				// 所有元素前移完成后,原最后一个元素置空,并将写指针置位该位置
				// 因为当前元素已经前移一位
				items[i] = null;
				this.putIndex = i;
				break;
			}
		}
		count--;
		if (itrs != null)
			itrs.removedAt(removeIndex);
	}
	notFull.signal();
}

3.2,removeAll(Collection<?> c):移除元素组,移除成功返回true,失败返回false

    * removeAll(Collection<?> c):父类方法,内部通过迭代循环移除,迭代部分分析

public boolean removeAll(Collection<?> c) {
	Objects.requireNonNull(c);
	boolean modified = false;
	// 迭代后循环移除
	Iterator<?> it = iterator();
	while (it.hasNext()) {
		if (c.contains(it.next())) {
			it.remove();
			modified = true;
		}
	}
	return modified;
}

4,迭代器源码分析,没分析清,网上资料也比较少,参考少,后续再补充,只能说比较麻烦,涉及参数和状态转变太多

4.1,相关元素

Itr:
// 迭代器下一次迭代的坐标,NONE表示没有
private int cursor;

// 迭代的下一个元素
private E nextItem;

// 迭代的下一个元素下标,null表示-1,被移除表示-2
private int nextIndex;

// 迭代上一个元素
private E lastItem;

// 迭代上一个元素下标,null表示-1,被移除表示-2
private int lastRet;

// 记录上次迭代的下标,迭代器失效时值为 DETACHED
private int prevTakeIndex;

// 记录上次循环次数,和cycles进行对比,可知变化
private int prevCycles;

// 为null时表示
private static final int NONE = -1;

// 元素被调用remove移除
private static final int REMOVED = -2;

// 迭代器过期
private static final int DETACHED = -3;

Itrs:
// 数组列表循环次数,每一次takeIndex到0时加1
int cycles = 0;

// 头结点,该链表后续添加节点为头结点
private Node head;

// 清理失效节点的标识节点
private Node sweeper = null;

// 清理的循环次数
private static final int SHORT_SWEEP_PROBES = 4;
private static final int LONG_SWEEP_PROBES = 16;

4.2,Itr():迭代器初始化

Itr() {
	// assert lock.getHoldCount() == 0;
	lastRet = NONE;
	final ReentrantLock lock = ArrayBlockingQueue.this.lock;
	lock.lock();
	try {
		// 数组元素为空,进行参数初始化
		if (count == 0) {
			cursor = NONE;
			nextIndex = NONE;
			prevTakeIndex = DETACHED;
		} else {
			// 当前读指针位置
			final int takeIndex = ArrayBlockingQueue.this.takeIndex;
			// 赋值上一次的读指针,该指针位置内部定义,有Itr内部控制
			prevTakeIndex = takeIndex;
			// 赋值下一个节点元素,初始化时赋值为第一个
			nextItem = itemAt(nextIndex = takeIndex);
			// 获取下一次迭代坐标,有下一个元素为下一个元素索引,没有返回-1
			cursor = incCursor(takeIndex);
			// 判断itrs是否已经初始化,未初始化则初始化,初始化则注册
			// 此处设计思想有点类似与ThreadLocal
			// 每一个 AarrayBlockingQueue 可以初始化多个迭代器,每一个迭代器即Itr在Itrs中以Node的形式存在,
			// Itr被包装为弱引用,在GC时如果没有强引用对象关联,则会被回收掉
			if (itrs == null) {
				itrs = new Itrs(this);
			} else {
				itrs.register(this);
				// 注册完成后对Itrs内的Node节点做清理
				itrs.doSomeSweeping(false);
			}
			prevCycles = itrs.cycles;
		}
	} finally {
		lock.unlock();
	}
}

    * incCursor(int index):获取下一个迭代位置

private int incCursor(int index) {
	// 默认迭代位置是当前索引的下一个位置
	// 如果当前索引位置与写指针位置重复,即没有后续节点,直接返回-1
	if (++index == items.length)
		index = 0;
	if (index == putIndex)
		index = NONE;
	return index;
}

    * register(initial):初始化 Itrs 及注册当前迭代器 Itr

// 初始化Itrs
Itrs(Itr initial) {
	register(initial);
}
// 注册迭代器到Itrs
void register(Itr itr) {
	// 直接构造为头结点,其他节点依次挂next节点
	// 该链表会把后续的节点挂在上游节点,即后来居上
	head = new Node(itr, head);
}

    * doSomeSweeping(boolean tryHarder):Itrs 数据清理,如果存在清理标识,从标识清理;不存在,在全链表清理

void doSomeSweeping(boolean tryHarder) {
	// 此处获取清理次数
	int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
	// o 节点表示 p 节点的上一个节点
	Node o, p;
	// 获取清理节点表示
	final Node sweeper = this.sweeper;
	// 根据sweeper判断的标志位
	boolean passedGo;

	// sweeper 为空,则全表扫描清理,o赋null值,p赋头值
	// sweeper 不为空,非全表扫描,o赋sweep值,p表示下一个节点
	if (sweeper == null) {
		o = null;
		p = head;
		passedGo = true;
	} else {
		o = sweeper;
		p = o.next;
		passedGo = false;
	}

	// 按照指定的循环次数循环
	for (; probes > 0; probes--) {
		// p节点为空,表示头结点或者o节点即清理节点的下一个节点为空
		if (p == null) {
			// passedGo为true,表示p为头节点,头结点为空,直接跳出循环
			if (passedGo)
				break;
			// 表示p为清理节点的下一个节点,为空重置头头结点
			o = null;
			p = head;
			passedGo = true;
		}
		// 获取节点处理的Itr迭代器,及下一个节点
		final Itr it = p.get();
		final Node next = p.next;
		// 如果迭代器为null(GC)回收
		// 或者迭代器已经过期(存在数据操作)进行数据清理
		if (it == null || it.isDetached()) {
			// 重置次数为最大次数
			probes = LONG_SWEEP_PROBES; 
			// 清理p节点,并重新关联o节点,
			p.clear();
			p.next = null;
			if (o == null) {
				head = next;
				// 如果不存在o节点也不存在下一个节点,则Itrs内没有数据,直接置空
				if (next == null) {
					itrs = null;
					return;
				}
			}
			else
				o.next = next;
		} else {
			// 节点未失效,继续向下寻找
			o = p;
		}
		p = next;
	}
	// 定义清理标识位置
	this.sweeper = (p == null) ? null : o;
}

    * isDetached():迭代器过期判断

boolean isDetached() {
	// prevTakeIndex小于0,说明线程已经对数据进行变更,并将该迭代器的状态置位过期
	return prevTakeIndex < 0;
}

4.3,elementDequeued():takeIndex变更引起的变更处理

    * elementDequeued():取数据触发

void elementDequeued() {
	// 队列中元素为空,进行空处理
	if (count == 0)
		queueIsEmpty();
	// 读指针为0,说明已经走过一次循环
	else if (takeIndex == 0)
		takeIndexWrapped();
}

    * queueIsEmpty()

void queueIsEmpty() {
	// 对Itrs进行置null
	// Itrs内部包装节点Node进行清空并关闭
	for (Node p = head; p != null; p = p.next) {
		Itr it = p.get();
		if (it != null) {
			p.clear();
			it.shutdown();
		}
	}
	head = null;
	itrs = null;
}

    * takeIndexWrapped()

// java.util.concurrent.ArrayBlockingQueue.Itrs#takeIndexWrapped
void takeIndexWrapped() {
	cycles++;
	for (Node o = null, p = head; p != null;) {
		final Itr it = p.get();
		final Node next = p.next;
		// 进行过期判断及处理,此处设置获取,取数据时如果满足重置条件会重置
		if (it == null || it.takeIndexWrapped()) {
			p.clear();
			p.next = null;
			if (o == null)
				head = next;
			else
				o.next = next;
		} else {
			o = p;
		}
		p = next;
	}
	if (head == null)   // no more iterators to track
		itrs = null;
}

4.4,hasNext():判断下一个元素

    * hasNext()

public boolean hasNext() {
	// Itr初始化时,已经对nextItem进行赋值,如果不为空,说明存在,返回true
	if (nextItem != null)
		return true;
	// 下一个节点不存在处理
	noNext();
	return false;
}

    * noNext()

private void noNext() {
	final ReentrantLock lock = ArrayBlockingQueue.this.lock;
	lock.lock();
	try {
		// 判断当前迭代器是否已经过期
		// 没有过期继续操作
		if (!isDetached()) {
			// 判断元素是否发生移动,移动后判断是否过期及对应参数重新赋值
			incorporateDequeues();
			if (lastRet >= 0) {
                // 获取上一个迭代元素,并失效当前Itr
				lastItem = itemAt(lastRet);
				detach();
			}
		}
	} finally {
		lock.unlock();
	}
}

    * incorporateDequeues()

private void incorporateDequeues() {
	// 当前循环次数
	final int cycles = itrs.cycles;
	// 读指针索引,当前读指针索引
	final int takeIndex = ArrayBlockingQueue.this.takeIndex;
	// 上一次循环次数
	final int prevCycles = this.prevCycles;
	// 迭代时指针索引,指创建迭代器时候的初始索引,后续可能会被变更
	final int prevTakeIndex = this.prevTakeIndex;

	// 如果循环次数不一致或者指针索引不一致,说明发生了数据偏移
	if (cycles != prevCycles || takeIndex != prevTakeIndex) {
		final int len = items.length;
		// 获取数据偏移量
		long dequeues = (cycles - prevCycles) * len
			+ (takeIndex - prevTakeIndex);

		// 判断上一个获取节点是否需要失效
		if (invalidated(lastRet, prevTakeIndex, dequeues, len))
			lastRet = REMOVED;
        // 判断下一个节点是否需要失效
		if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
			nextIndex = REMOVED;
        // 判断是否存在下一个迭代元素
		if (invalidated(cursor, prevTakeIndex, dequeues, len))
			cursor = takeIndex;
		
		// 需要失效处理,则失效
		if (cursor < 0 && nextIndex < 0 && lastRet < 0)
			detach();
		// 不失效,则重置上一次值为当前值
		else {
			this.prevCycles = cycles;
			this.prevTakeIndex = takeIndex;
		}
	}
}

    * invalidated(int index, int prevTakeIndex, long dequeues, int length):失效算法判断,目前没搞懂

private boolean invalidated(int index, int prevTakeIndex, long dequeues, int length) {
	if (index < 0)
		return false;
	int distance = index - prevTakeIndex;
	if (distance < 0)
		distance += length;
	return dequeues > distance;
}

4.5,next():获取下一个元素

public E next() {
	// 迭代器初始化后,如果此时将队列清空,因为nextItem已经初始化,此时依旧会返回初始化迭代时候的第一个元素
	// 下一个元素不存在,直接空指针
	final E x = nextItem;
	if (x == null)
		throw new NoSuchElementException();
	final ReentrantLock lock = ArrayBlockingQueue.this.lock;
	lock.lock();
	try {
		// 迭代器未失效,进行数据偏移矫正或者直接失效
		if (!isDetached())
			incorporateDequeues();
		// 重置上一个节点及下一个节点的下标和元素
		lastRet = nextIndex;
		final int cursor = this.cursor;
		// 存在下一个坐标
		if (cursor >= 0) {
			nextItem = itemAt(nextIndex = cursor);
			// 下一次迭代坐标递增
			this.cursor = incCursor(cursor);
		// 不存在下一个坐标,直接对相关元素置空
		} else {
			nextIndex = NONE;
			nextItem = null;
		}
	} finally {
		lock.unlock();
	}
	return x;
}

4.6,remove():移除当前元素

    * remove()

public void remove() {
	final ReentrantLock lock = ArrayBlockingQueue.this.lock;
	lock.lock();
	try {
		// 如果没有过期,进行元素位置矫正
		if (!isDetached())
			incorporateDequeues();
		// 获取上一个获取的元素坐标
		// Itr初始化时候,lastRet已经初始化为-1
		// 调用next():会对lastRet进行重赋值,赋当时的next值
		final int lastRet = this.lastRet;
		this.lastRet = NONE;
		// lastRet >= 0,说明已经调用next初始化
		if (lastRet >= 0) {
			// Itr没有过期,直接移除
			if (!isDetached())
                // 调用ArrayBlockingQueue移除
				removeAt(lastRet);
			else {
				// Itr已经过期,对lastItem进行处理
				final E lastItem = this.lastItem;
				this.lastItem = null;
				if (itemAt(lastRet) == lastItem)
					removeAt(lastRet);
			}
		// 为NONE,说明没有初始化
		} else if (lastRet == NONE)
			throw new IllegalStateException();
		// 不存在下一个元素,直接失效
		if (cursor < 0 && nextIndex < 0)
			detach();
	} finally {
		lock.unlock();
	}
}

    * removedAt(int removedIndex):调用Itrs移除

// java.util.concurrent.ArrayBlockingQueue.Itrs#removedAt
void removedAt(int removedIndex) {
	// 遍历Itrs中整串Itr
	for (Node o = null, p = head; p != null;) {
		final Itr it = p.get();
		final Node next = p.next;
		// Itr 为null,说明已经被GC回收,直接清空回收处理
		// Itr不为空,判断是否可以移除
		if (it == null || it.removedAt(removedIndex)) {
			p.clear();
			p.next = null;
			if (o == null)
				head = next;
			else
				o.next = next;
		} else {
			o = p;
		}
		p = next;
	}
	// head为null,说明已经遍历完成,则直接置空Itrs
	if (head == null)
		itrs = null;
}

    * removedAt(int removedIndex):调用Itr判断是否可以移除,没懂,有空再分析吧

// java.util.concurrent.ArrayBlockingQueue.Itr#removedAt
boolean removedAt(int removedIndex) {
	if (isDetached())
		return true;

	final int cycles = itrs.cycles;
	final int takeIndex = ArrayBlockingQueue.this.takeIndex;
	final int prevCycles = this.prevCycles;
	final int prevTakeIndex = this.prevTakeIndex;
	final int len = items.length;
	int cycleDiff = cycles - prevCycles;
	if (removedIndex < takeIndex)
		cycleDiff++;
	final int removedDistance =
		(cycleDiff * len) + (removedIndex - prevTakeIndex);
	int cursor = this.cursor;
	if (cursor >= 0) {
		int x = distance(cursor, prevTakeIndex, len);
		if (x == removedDistance) {
			if (cursor == putIndex)
				this.cursor = cursor = NONE;
		}
		else if (x > removedDistance) {
			this.cursor = cursor = dec(cursor);
		}
	}
	int lastRet = this.lastRet;
	if (lastRet >= 0) {
		int x = distance(lastRet, prevTakeIndex, len);
		if (x == removedDistance)
			this.lastRet = lastRet = REMOVED;
		else if (x > removedDistance)
			this.lastRet = lastRet = dec(lastRet);
	}
	int nextIndex = this.nextIndex;
	if (nextIndex >= 0) {
		int x = distance(nextIndex, prevTakeIndex, len);
		if (x == removedDistance)
			this.nextIndex = nextIndex = REMOVED;
		else if (x > removedDistance)
			this.nextIndex = nextIndex = dec(nextIndex);
	}
	else if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
		this.prevTakeIndex = DETACHED;
		return true;
	}
	return false;
}

 

最后

以上就是正直大碗为你收集整理的并发编程(十三):阻塞队列之ArrayBlockingQueue一,关联源码链接二,ArrayBlockingQueue 概述二,锁操作及指针操作三,源码分析的全部内容,希望文章能够帮你解决并发编程(十三):阻塞队列之ArrayBlockingQueue一,关联源码链接二,ArrayBlockingQueue 概述二,锁操作及指针操作三,源码分析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(65)

评论列表共有 0 条评论

立即
投稿
返回
顶部