我是靠谱客的博主 正直大碗,这篇文章主要介绍并发编程(十三):阻塞队列之ArrayBlockingQueue一,关联源码链接二,ArrayBlockingQueue 概述二,锁操作及指针操作三,源码分析,现在分享给大家,希望可以做个参考。

一,关联源码链接

    * 并发编程(四):AbstractQueuedSynchronizer源码分析

    * 并发编程(五):AQS之重入锁ReentrantLock

   * 并发编程(七):AQS之Condition

二,ArrayBlockingQueue 概述

1,ArrayBlockingQueue

    ArrayBlockingQueue 是通过数组实现的FIFO(先进先出)有界阻塞队列,数组长度在初始化时指定并固定,不存在数组扩容。ArrayBlockingQueue 内部通过重入锁 ReentrantLock 进行线程加锁,保证数据原子性。阻塞及释放通过两个 Condition 构成 notEmpty notFull 两个实例,通过 Condition 线程通信实现加数据和取数据的线程阻塞及线程唤醒。ArrayBlockingQueue 元素获取基于指针操作,内部维护 putIndex takeIndex 两个指针,对于移除元素操作和添加元素元素都由着两个指针控制操作索引,并顺序后移,具体源码分析;

2,类图

3,常用API

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 初始化,通过构造器重载处理 public ArrayBlockingQueue(int capacity); ArrayBlockingQueue(int capacity, boolean fair); public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c); // 取元素: // 获取当前元素,不阻塞,不移动指针 public E peek(); // 获取当前元素,队列为空返回null,队列不为空返回元素,获取后移除 public E poll(); // 获取当前元素,队列为空阻塞,不为空返回元素,获取后移除 public E take() throws InterruptedException; // 写元素: // 添加元素到队列,队列未满插入成功,队列满了直接抛异常 public boolean add(E e); // 添加元素到队列,队列未满插入成功,队列满了插入失败,返回true/false public boolean offer(E e); // 添加元素到队列,队列未满插入成功,队列满了阻塞插入 public void put(E e) throws InterruptedException; // 移除元素 public boolean remove(Object o); public boolean removeAll(Collection<?> c); // 迭代器部分 // 构建迭代 public Iterator<E> iterator(); // 迭代_是否存在下一个 public boolean hasNext(); // 迭代_获取下一个 public E next(); // 迭代_移除 public void remove();

4,功能DEMO

    * 此处只是简单演示元素插入和元素获取,以及元素插入和获取关联影响到的阻塞处理

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.asdc.mtTest_1; import javax.swing.*; import java.util.concurrent.ArrayBlockingQueue; /** * @author LiYanBin * @create 2019-10-23 17:57 **/ public class ArrayBlockTest { public static void main(String[] args) throws InterruptedException { ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<>(1); new Thread(() -> { try { arrayBlockingQueue.add("add方式"); System.out.println("获取不移除:" + arrayBlockingQueue.peek()); System.out.println("获取并移除:" + arrayBlockingQueue.poll()); System.out.println("阻塞获取前:" + System.currentTimeMillis()); System.out.println("阻塞获取:" + arrayBlockingQueue.take()); System.out.println("阻塞获取后:" + System.currentTimeMillis()); // 沉睡足够时间,等待线程2阻塞插入等待后,消费一次,让阻塞插入成功 Thread.sleep(5000); System.out.println("获取元素,释放阻塞插入:" + arrayBlockingQueue.take()); System.out.println("获取阻塞插入元素:" + arrayBlockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); Thread.sleep(3000); new Thread(() -> { try { System.out.println("添加元素,满后抛异常:" + arrayBlockingQueue.add("add方式")); System.out.println("add元素被阻塞获取获取后并释放,则offer为true"); System.out.println("添加元素,未满返回true,满后返回false:" + arrayBlockingQueue.offer("offer方式")); System.out.println("阻塞插入前," + System.currentTimeMillis()); arrayBlockingQueue.put("put方式"); System.out.println("阻塞插入后," + System.currentTimeMillis()); } catch (Exception e) { e.printStackTrace(); } }).start(); } }

二,锁操作及指针操作

1,锁操作

1.1,加锁处理:ReentrantLock

    * ArrayBlockingQueue 通过重入锁保证原子性操作。在每一次数组元素操作时,都进行加锁和释放锁处理

复制代码
1
final ReentrantLock lock;

1.2,线程通信:Condition

    * ArrayBlockingQueue 内部定义了两个 Condition 实例,从下面名称可以看出,分别为 notEmptynotFull。在对数组元素进行操作时,读元素或者写写元素成功后,会默认出发一次 notEmpty/notFull.signal(),唤醒一个等待线程进行元素插入或者元素消费。同样,在读元素或者取元素时,如果数据已满或者数据为空,则会触发 notEmpty/notFull.await() 进行等待,等待元素或者元素消费后进行线程唤醒

复制代码
1
2
3
4
5
/** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;

2,指针操作

    * ArrayBlockingQueue 虽然底层也是数组形式,但是和 ArrayList 的元素读取还是有所区别。ArrayBlockingQueue 内部元素排列也是连续性的,但是这个连续性是对于一段下标区间的。元素的插入和移除相对应的会对 takeIndex 和 putIndex 指针进行递增操作,而下一次插入或者移除继续从指针位置开始。比如,对于一个空 ArrayBlockingQueue 队列来讲,第一次插入元素默认 putIndex 为0,则添加到0下标位置,同时 putIndex++。此时如果存在线程进行数据移除,而 takeIndex 同样为0,则移除0下标的元素,同时 takeIndex++。这时候继续添加元素,putIndex 已经修改为1,则添加元素到1下标位置,后续操作相同。这时候数组形态就是0下标为null,1下标有元素,后续皆为null。所以元素读写是一个连续的过程,而元素移除后,同样会对后续部分整体前移,并修改读写指针;

复制代码
1
2
3
4
5
/** items index for next take, poll, peek or remove */ int takeIndex; /** items index for next put, offer, or add */ int putIndex;

三,源码分析

1,读源码分析

1.1,peek():获取当前指针元素,不阻塞,不移动指针,存在返回元素,不存在返回null

    * peek()

复制代码
1
2
3
4
5
6
7
8
9
10
11
public E peek() { // 加锁释放锁操作 final ReentrantLock lock = this.lock; lock.lock(); try { // 根据读索引获取索引位置元素 return itemAt(takeIndex); } finally { lock.unlock(); } }

    * itemAt(int i):返回位置元素,不存在则为null

复制代码
1
2
3
final E itemAt(int i) { return (E) items[i]; }

1.2,poll():获取当前指针元素,数组为空返回null,数组不为空返回当前指针元素并移除该元素,指针后移

    * poll()

复制代码
1
2
3
4
5
6
7
8
9
10
11
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { // 数组为空返回null,不为空获取元素 // dequeue:为获取数据底层方法 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }

    * dequeue():元素读取底层方法

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private E dequeue() { final Object[] items = this.items; // 从数据中获取读指针下标元素 E x = (E) items[takeIndex]; // 获取并移除在此处体现,读取到后直接置null items[takeIndex] = null; // 读指针获取完成后,直接递增,如果已经指向最后一个下标,则回到第一个下标 if (++takeIndex == items.length) takeIndex = 0; // 数据元素数量递减 count--; // 此处迭代器操作,后续迭代器分析 if (itrs != null) itrs.elementDequeued(); // 元素读取并移除后,元素必定未满,唤醒notFull线程,允许元素插入 notFull.signal(); // 返回元素 return x; }

1.3,take():获取当前指针元素,数组为空阻塞,数组不为空获取当前指针元素并移除该元素,指针后移

    * take()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // take方法为空阻塞在此处阻塞,数组元素数量为空,notEmpty线程等待 // 等待数组存在数据插入时,会唤醒该方法 while (count == 0) notEmpty.await(); // 取数据 return dequeue(); } finally { lock.unlock(); } }

2,写源码分析

2.1,add(E e):添加元素,数组满了抛异常,数组未满添加到指针位置

    * add()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
public boolean add(E e) { // 向上调用父类方法 return super.add(e); } // java.util.AbstractQueue#add public boolean add(E e) { // 内部调用offer方法,插入成功返回true // 插入失败直接抛异常 if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }

2.2,offer(E e):添加元素,数据满了返回false,数组未满返回true

    * offer(E e)

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public boolean offer(E e) { // null值校验,此处为null直接抛空指针异常 checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { // 数组已满,返回false if (count == items.length) return false; else { // 数据未满,写数据 enqueue(e); return true; } } finally { lock.unlock(); } }

    * enqueue(E x):元素入列,此为底层方法

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
private void enqueue(E x) { // 获取元素,并添加元素到写指针位置 final Object[] items = this.items; items[putIndex] = x; // 写指针到末尾,直接置为头索引,并底层 if (++putIndex == items.length) putIndex = 0; // 数组元素数量递增 count++; // 添加成功后,数组元素不为空,唤醒等待读锁 notEmpty.signal(); }

2.3,put(E e):添加元素,数组满了阻塞,数据未满添加成功

    * put(E e)

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { // 数组已满,则进行等待,等待读数据后唤醒 while (count == items.length) notFull.await(); // 添加元素 enqueue(e); } finally { lock.unlock(); } }

3,移除源码分析

3.1,remove(Object o):移除元素,移除成功返回true,失败返回false

   * remove(Object o)

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { // 数组存在元素,进行移除操作 if (count > 0) { // 从读指针取到写指针,也就是取完完整的数据段 final int putIndex = this.putIndex; int i = takeIndex; do { // 遍历到移除数据,直接进行移除操作 if (o.equals(items[i])) { removeAt(i); return true; } if (++i == items.length) i = 0; } while (i != putIndex); } // 数组不存在元素,直接返回false return false; } finally { lock.unlock(); } }

    * removeAt(final int removeIndex)

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void removeAt(final int removeIndex) { final Object[] items = this.items; // 删除下标等于读指针,直接把该下标元素置空,并递增读指针 if (removeIndex == takeIndex) { items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; // 元素数量递减 count--; if (itrs != null) itrs.elementDequeued(); // 删除下标元素不等于读指针,说明删除的是数据段内部节点 } else { final int putIndex = this.putIndex; for (int i = removeIndex;;) { // 首先获取移除元素下标的下一个下标 int next = i + 1; if (next == items.length) next = 0; // 下一个下标位置不等于写指针位置,也就是说明还在数据段内部 if (next != putIndex) { // 用当前元素覆盖删除元素,并循环处理,此处目的是将后续元素统一前移一位, items[i] = items[next]; i = next; } else { // 所有元素前移完成后,原最后一个元素置空,并将写指针置位该位置 // 因为当前元素已经前移一位 items[i] = null; this.putIndex = i; break; } } count--; if (itrs != null) itrs.removedAt(removeIndex); } notFull.signal(); }

3.2,removeAll(Collection<?> c):移除元素组,移除成功返回true,失败返回false

    * removeAll(Collection<?> c):父类方法,内部通过迭代循环移除,迭代部分分析

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
public boolean removeAll(Collection<?> c) { Objects.requireNonNull(c); boolean modified = false; // 迭代后循环移除 Iterator<?> it = iterator(); while (it.hasNext()) { if (c.contains(it.next())) { it.remove(); modified = true; } } return modified; }

4,迭代器源码分析,没分析清,网上资料也比较少,参考少,后续再补充,只能说比较麻烦,涉及参数和状态转变太多

4.1,相关元素

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
Itr: // 迭代器下一次迭代的坐标,NONE表示没有 private int cursor; // 迭代的下一个元素 private E nextItem; // 迭代的下一个元素下标,null表示-1,被移除表示-2 private int nextIndex; // 迭代上一个元素 private E lastItem; // 迭代上一个元素下标,null表示-1,被移除表示-2 private int lastRet; // 记录上次迭代的下标,迭代器失效时值为 DETACHED private int prevTakeIndex; // 记录上次循环次数,和cycles进行对比,可知变化 private int prevCycles; // 为null时表示 private static final int NONE = -1; // 元素被调用remove移除 private static final int REMOVED = -2; // 迭代器过期 private static final int DETACHED = -3; Itrs: // 数组列表循环次数,每一次takeIndex到0时加1 int cycles = 0; // 头结点,该链表后续添加节点为头结点 private Node head; // 清理失效节点的标识节点 private Node sweeper = null; // 清理的循环次数 private static final int SHORT_SWEEP_PROBES = 4; private static final int LONG_SWEEP_PROBES = 16;

4.2,Itr():迭代器初始化

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Itr() { // assert lock.getHoldCount() == 0; lastRet = NONE; final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { // 数组元素为空,进行参数初始化 if (count == 0) { cursor = NONE; nextIndex = NONE; prevTakeIndex = DETACHED; } else { // 当前读指针位置 final int takeIndex = ArrayBlockingQueue.this.takeIndex; // 赋值上一次的读指针,该指针位置内部定义,有Itr内部控制 prevTakeIndex = takeIndex; // 赋值下一个节点元素,初始化时赋值为第一个 nextItem = itemAt(nextIndex = takeIndex); // 获取下一次迭代坐标,有下一个元素为下一个元素索引,没有返回-1 cursor = incCursor(takeIndex); // 判断itrs是否已经初始化,未初始化则初始化,初始化则注册 // 此处设计思想有点类似与ThreadLocal // 每一个 AarrayBlockingQueue 可以初始化多个迭代器,每一个迭代器即Itr在Itrs中以Node的形式存在, // Itr被包装为弱引用,在GC时如果没有强引用对象关联,则会被回收掉 if (itrs == null) { itrs = new Itrs(this); } else { itrs.register(this); // 注册完成后对Itrs内的Node节点做清理 itrs.doSomeSweeping(false); } prevCycles = itrs.cycles; } } finally { lock.unlock(); } }

    * incCursor(int index):获取下一个迭代位置

复制代码
1
2
3
4
5
6
7
8
9
private int incCursor(int index) { // 默认迭代位置是当前索引的下一个位置 // 如果当前索引位置与写指针位置重复,即没有后续节点,直接返回-1 if (++index == items.length) index = 0; if (index == putIndex) index = NONE; return index; }

    * register(initial):初始化 Itrs 及注册当前迭代器 Itr

复制代码
1
2
3
4
5
6
7
8
9
10
// 初始化Itrs Itrs(Itr initial) { register(initial); } // 注册迭代器到Itrs void register(Itr itr) { // 直接构造为头结点,其他节点依次挂next节点 // 该链表会把后续的节点挂在上游节点,即后来居上 head = new Node(itr, head); }

    * doSomeSweeping(boolean tryHarder):Itrs 数据清理,如果存在清理标识,从标识清理;不存在,在全链表清理

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
void doSomeSweeping(boolean tryHarder) { // 此处获取清理次数 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES; // o 节点表示 p 节点的上一个节点 Node o, p; // 获取清理节点表示 final Node sweeper = this.sweeper; // 根据sweeper判断的标志位 boolean passedGo; // sweeper 为空,则全表扫描清理,o赋null值,p赋头值 // sweeper 不为空,非全表扫描,o赋sweep值,p表示下一个节点 if (sweeper == null) { o = null; p = head; passedGo = true; } else { o = sweeper; p = o.next; passedGo = false; } // 按照指定的循环次数循环 for (; probes > 0; probes--) { // p节点为空,表示头结点或者o节点即清理节点的下一个节点为空 if (p == null) { // passedGo为true,表示p为头节点,头结点为空,直接跳出循环 if (passedGo) break; // 表示p为清理节点的下一个节点,为空重置头头结点 o = null; p = head; passedGo = true; } // 获取节点处理的Itr迭代器,及下一个节点 final Itr it = p.get(); final Node next = p.next; // 如果迭代器为null(GC)回收 // 或者迭代器已经过期(存在数据操作)进行数据清理 if (it == null || it.isDetached()) { // 重置次数为最大次数 probes = LONG_SWEEP_PROBES; // 清理p节点,并重新关联o节点, p.clear(); p.next = null; if (o == null) { head = next; // 如果不存在o节点也不存在下一个节点,则Itrs内没有数据,直接置空 if (next == null) { itrs = null; return; } } else o.next = next; } else { // 节点未失效,继续向下寻找 o = p; } p = next; } // 定义清理标识位置 this.sweeper = (p == null) ? null : o; }

    * isDetached():迭代器过期判断

复制代码
1
2
3
4
boolean isDetached() { // prevTakeIndex小于0,说明线程已经对数据进行变更,并将该迭代器的状态置位过期 return prevTakeIndex < 0; }

4.3,elementDequeued():takeIndex变更引起的变更处理

    * elementDequeued():取数据触发

复制代码
1
2
3
4
5
6
7
8
void elementDequeued() { // 队列中元素为空,进行空处理 if (count == 0) queueIsEmpty(); // 读指针为0,说明已经走过一次循环 else if (takeIndex == 0) takeIndexWrapped(); }

    * queueIsEmpty()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
void queueIsEmpty() { // 对Itrs进行置null // Itrs内部包装节点Node进行清空并关闭 for (Node p = head; p != null; p = p.next) { Itr it = p.get(); if (it != null) { p.clear(); it.shutdown(); } } head = null; itrs = null; }

    * takeIndexWrapped()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// java.util.concurrent.ArrayBlockingQueue.Itrs#takeIndexWrapped void takeIndexWrapped() { cycles++; for (Node o = null, p = head; p != null;) { final Itr it = p.get(); final Node next = p.next; // 进行过期判断及处理,此处设置获取,取数据时如果满足重置条件会重置 if (it == null || it.takeIndexWrapped()) { p.clear(); p.next = null; if (o == null) head = next; else o.next = next; } else { o = p; } p = next; } if (head == null) // no more iterators to track itrs = null; }

4.4,hasNext():判断下一个元素

    * hasNext()

复制代码
1
2
3
4
5
6
7
8
public boolean hasNext() { // Itr初始化时,已经对nextItem进行赋值,如果不为空,说明存在,返回true if (nextItem != null) return true; // 下一个节点不存在处理 noNext(); return false; }

    * noNext()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void noNext() { final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { // 判断当前迭代器是否已经过期 // 没有过期继续操作 if (!isDetached()) { // 判断元素是否发生移动,移动后判断是否过期及对应参数重新赋值 incorporateDequeues(); if (lastRet >= 0) { // 获取上一个迭代元素,并失效当前Itr lastItem = itemAt(lastRet); detach(); } } } finally { lock.unlock(); } }

    * incorporateDequeues()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private void incorporateDequeues() { // 当前循环次数 final int cycles = itrs.cycles; // 读指针索引,当前读指针索引 final int takeIndex = ArrayBlockingQueue.this.takeIndex; // 上一次循环次数 final int prevCycles = this.prevCycles; // 迭代时指针索引,指创建迭代器时候的初始索引,后续可能会被变更 final int prevTakeIndex = this.prevTakeIndex; // 如果循环次数不一致或者指针索引不一致,说明发生了数据偏移 if (cycles != prevCycles || takeIndex != prevTakeIndex) { final int len = items.length; // 获取数据偏移量 long dequeues = (cycles - prevCycles) * len + (takeIndex - prevTakeIndex); // 判断上一个获取节点是否需要失效 if (invalidated(lastRet, prevTakeIndex, dequeues, len)) lastRet = REMOVED; // 判断下一个节点是否需要失效 if (invalidated(nextIndex, prevTakeIndex, dequeues, len)) nextIndex = REMOVED; // 判断是否存在下一个迭代元素 if (invalidated(cursor, prevTakeIndex, dequeues, len)) cursor = takeIndex; // 需要失效处理,则失效 if (cursor < 0 && nextIndex < 0 && lastRet < 0) detach(); // 不失效,则重置上一次值为当前值 else { this.prevCycles = cycles; this.prevTakeIndex = takeIndex; } } }

    * invalidated(int index, int prevTakeIndex, long dequeues, int length):失效算法判断,目前没搞懂

复制代码
1
2
3
4
5
6
7
8
private boolean invalidated(int index, int prevTakeIndex, long dequeues, int length) { if (index < 0) return false; int distance = index - prevTakeIndex; if (distance < 0) distance += length; return dequeues > distance; }

4.5,next():获取下一个元素

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public E next() { // 迭代器初始化后,如果此时将队列清空,因为nextItem已经初始化,此时依旧会返回初始化迭代时候的第一个元素 // 下一个元素不存在,直接空指针 final E x = nextItem; if (x == null) throw new NoSuchElementException(); final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { // 迭代器未失效,进行数据偏移矫正或者直接失效 if (!isDetached()) incorporateDequeues(); // 重置上一个节点及下一个节点的下标和元素 lastRet = nextIndex; final int cursor = this.cursor; // 存在下一个坐标 if (cursor >= 0) { nextItem = itemAt(nextIndex = cursor); // 下一次迭代坐标递增 this.cursor = incCursor(cursor); // 不存在下一个坐标,直接对相关元素置空 } else { nextIndex = NONE; nextItem = null; } } finally { lock.unlock(); } return x; }

4.6,remove():移除当前元素

    * remove()

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void remove() { final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { // 如果没有过期,进行元素位置矫正 if (!isDetached()) incorporateDequeues(); // 获取上一个获取的元素坐标 // Itr初始化时候,lastRet已经初始化为-1 // 调用next():会对lastRet进行重赋值,赋当时的next值 final int lastRet = this.lastRet; this.lastRet = NONE; // lastRet >= 0,说明已经调用next初始化 if (lastRet >= 0) { // Itr没有过期,直接移除 if (!isDetached()) // 调用ArrayBlockingQueue移除 removeAt(lastRet); else { // Itr已经过期,对lastItem进行处理 final E lastItem = this.lastItem; this.lastItem = null; if (itemAt(lastRet) == lastItem) removeAt(lastRet); } // 为NONE,说明没有初始化 } else if (lastRet == NONE) throw new IllegalStateException(); // 不存在下一个元素,直接失效 if (cursor < 0 && nextIndex < 0) detach(); } finally { lock.unlock(); } }

    * removedAt(int removedIndex):调用Itrs移除

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// java.util.concurrent.ArrayBlockingQueue.Itrs#removedAt void removedAt(int removedIndex) { // 遍历Itrs中整串Itr for (Node o = null, p = head; p != null;) { final Itr it = p.get(); final Node next = p.next; // Itr 为null,说明已经被GC回收,直接清空回收处理 // Itr不为空,判断是否可以移除 if (it == null || it.removedAt(removedIndex)) { p.clear(); p.next = null; if (o == null) head = next; else o.next = next; } else { o = p; } p = next; } // head为null,说明已经遍历完成,则直接置空Itrs if (head == null) itrs = null; }

    * removedAt(int removedIndex):调用Itr判断是否可以移除,没懂,有空再分析吧

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// java.util.concurrent.ArrayBlockingQueue.Itr#removedAt boolean removedAt(int removedIndex) { if (isDetached()) return true; final int cycles = itrs.cycles; final int takeIndex = ArrayBlockingQueue.this.takeIndex; final int prevCycles = this.prevCycles; final int prevTakeIndex = this.prevTakeIndex; final int len = items.length; int cycleDiff = cycles - prevCycles; if (removedIndex < takeIndex) cycleDiff++; final int removedDistance = (cycleDiff * len) + (removedIndex - prevTakeIndex); int cursor = this.cursor; if (cursor >= 0) { int x = distance(cursor, prevTakeIndex, len); if (x == removedDistance) { if (cursor == putIndex) this.cursor = cursor = NONE; } else if (x > removedDistance) { this.cursor = cursor = dec(cursor); } } int lastRet = this.lastRet; if (lastRet >= 0) { int x = distance(lastRet, prevTakeIndex, len); if (x == removedDistance) this.lastRet = lastRet = REMOVED; else if (x > removedDistance) this.lastRet = lastRet = dec(lastRet); } int nextIndex = this.nextIndex; if (nextIndex >= 0) { int x = distance(nextIndex, prevTakeIndex, len); if (x == removedDistance) this.nextIndex = nextIndex = REMOVED; else if (x > removedDistance) this.nextIndex = nextIndex = dec(nextIndex); } else if (cursor < 0 && nextIndex < 0 && lastRet < 0) { this.prevTakeIndex = DETACHED; return true; } return false; }

 

最后

以上就是正直大碗最近收集整理的关于并发编程(十三):阻塞队列之ArrayBlockingQueue一,关联源码链接二,ArrayBlockingQueue 概述二,锁操作及指针操作三,源码分析的全部内容,更多相关并发编程(十三):阻塞队列之ArrayBlockingQueue一,关联源码链接二,ArrayBlockingQueue内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(81)

评论列表共有 0 条评论

立即
投稿
返回
顶部