1.JAVA多线程(二十七)Java多线程之LinkedBlockingQueue容器
1.1 什么是LinkedBlockingQueue
LinkedBlockingQueue 底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用,同样满足 FIFO 的特性,与 ArrayBlockingQueue 相比起来具有更高的吞吐量,为了防止 LinkedBlockingQueue 容量迅速增,损耗大量内存。通常在创建 LinkedBlockingQueue 对象时,会指定其大小,如果未指定,容量等于 Integer.MAX_VALUE。
下面是 LinkedBlockingQueue继承结构关系图:
1.2 LinkedBlockingQueue单向链表实现的阻塞队列
LinkedBlockingQueue是一个阻塞队列,内部由两个ReentrantLock来实现出入队列的线程安全,由各自的Condition对象的await和signal来实现等待和唤醒功能。它和ArrayBlockingQueue的不同点在于:
- 队列大小有所不同,ArrayBlockingQueue是有界的初始化必须指定大小,而LinkedBlockingQueue可以是有界的也可以是无界的(Integer.MAX_VALUE),对于后者而言,当添加速度大于移除速度时,在无界的情况下,可能会造成内存溢出等问题。
- 数据存储容器不同,ArrayBlockingQueue采用的是数组作为数据存储容器,而LinkedBlockingQueue采用的则是以Node节点作为连接对象的链表。
- 由于ArrayBlockingQueue采用的是数组的存储容器,因此在插入或删除元素时不会产生或销毁任何额外的对象实例,而LinkedBlockingQueue则会生成一个额外的Node对象。这可能在长时间内需要高效并发地处理大批量数据的时,对于GC可能存在较大影响。
- 两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
通过源代码查看LinkedBlockingQueue实现:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55/** * 创建一个 LinkedBlockingQueue ,容量为 Integer.MAX_VALUE 。 * * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } /** * 创建一个具有给定(固定)容量的 LinkedBlockingQueue 。 * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. * * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity} is not greater * than zero */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } /** * 创建一个 LinkedBlockingQueue ,容量为 Integer.MAX_VALUE ,最初包含给定集合的元素,以集合的迭代* 器的遍历顺序添加。 * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}, initially containing the elements of the * given collection, * added in traversal order of the collection's iterator. * * @param c the collection of elements to initially contain * @throws NullPointerException if the specified collection or any * of its elements are null */ public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
1.3 常用的方法
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211package com.yuanxw.thread.chapter27; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class LinkedBlockingQueueExample { public static void main(String[] args) throws InterruptedException { // add(); // offer(); // put(); // poll(); // peek(); // element(); remove(); } /** 如果可以在不超过队列的容量的情况下立即将其指定的元素插入到该队列, 如果队列已满,则返回 true并抛出 IllegalStateException 。 执行结果: =====执行add()签名方法-开始===== linkedBlockingQueue.add()执行返回结果:true linkedBlockingQueue.add()执行返回结果:true linkedBlockingQueue.add()执行返回结果:true Exception in thread "main" java.lang.IllegalStateException: Queue full at java.util.AbstractQueue.add(AbstractQueue.java:98) at com.yuanxw.thread.chapter27.LinkedBlockingQueueExample.add(LinkedBlockingQueueExample.java:38) at com.yuanxw.thread.chapter27.LinkedBlockingQueueExample.main(LinkedBlockingQueueExample.java:9) */ public static void add() { System.out.println("=====执行add()签名方法-开始====="); LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3); System.out.println("linkedBlockingQueue.add()执行返回结果:"+linkedBlockingQueue.add("Message 1")); System.out.println("linkedBlockingQueue.add()执行返回结果:"+linkedBlockingQueue.add("Message 2")); System.out.println("linkedBlockingQueue.add()执行返回结果:"+linkedBlockingQueue.add("Message 3")); System.out.println("linkedBlockingQueue.add()执行返回结果:"+linkedBlockingQueue.add("Message 4")); System.out.println("=====执行add()签名方法-结束====="); } /** * 如果可以在不超过队列容量的情况下立即将其指定的元素插入该队列的尾部, * 则在成功时true如果该队列已满,则返回false 。 执行结果: =====执行offer()签名方法-开始===== linkedBlockingQueue.offer()执行返回结果:true linkedBlockingQueue.offer()执行返回结果:true linkedBlockingQueue.offer()执行返回结果:true linkedBlockingQueue.offer()执行返回结果:false =====执行offer()签名方法-结束===== */ public static void offer(){ System.out.println("=====执行offer()签名方法-开始====="); LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3); System.out.println("linkedBlockingQueue.offer()执行返回结果:"+linkedBlockingQueue.offer("Message 1")); System.out.println("linkedBlockingQueue.offer()执行返回结果:"+linkedBlockingQueue.offer("Message 2")); System.out.println("linkedBlockingQueue.offer()执行返回结果:"+linkedBlockingQueue.offer("Message 3")); System.out.println("linkedBlockingQueue.offer()执行返回结果:"+linkedBlockingQueue.offer("Message 4")); System.out.println("=====执行offer()签名方法-结束====="); } /** * 在该队列的尾部插入指定的元素,如果队列已满,则等待空间变为可用。 执行结果: =====执行put()签名方法-开始===== 当前linkedBlockingQueue对象中的个数:3 当前linkedBlockingQueue对象中的容量:0 当前linkedBlockingQueue对象中take()数据值为:Message 1 Message 2 Message 3 Message 4 =====执行put()签名方法-结束===== */ public static void put() throws InterruptedException { System.out.println("=====执行put()签名方法-开始====="); LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3); linkedBlockingQueue.put("Message 1"); linkedBlockingQueue.put("Message 2"); linkedBlockingQueue.put("Message 3"); Executors.newSingleThreadExecutor().execute(()->{ System.out.println("当前linkedBlockingQueue对象中的个数:" + linkedBlockingQueue.size()); System.out.println("当前linkedBlockingQueue对象中的容量:" + linkedBlockingQueue.remainingCapacity()); try { TimeUnit.SECONDS.sleep(5); // 检索并删除此队列的头 Object take = linkedBlockingQueue.take(); System.out.println("当前linkedBlockingQueue对象中take()数据值为:" + take); } catch (InterruptedException e) { e.printStackTrace(); } }); linkedBlockingQueue.put("Message 4"); linkedBlockingQueue.forEach(System.out::println); System.out.println("=====执行put()签名方法-结束====="); } /** * 检索并删除此队列的头,则等待空间变为可用。 执行结果: =====执行poll()签名方法-开始===== 当前linkedBlockingQueue对象中的个数:【3】,容量:【0】 当前linkedBlockingQueue对象中poll()数据值为:Message 1 当前linkedBlockingQueue对象中poll()数据值为:Message 2 当前linkedBlockingQueue对象中的个数:【1】,容量:【2】 当前linkedBlockingQueue对象中poll()数据值为:Message 3 当前linkedBlockingQueue对象中poll()数据值为:null 当前linkedBlockingQueue对象中poll()数据值为:null =====执行poll()签名方法-结束===== */ public static void poll() throws InterruptedException { System.out.println("=====执行poll()签名方法-开始====="); LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3); linkedBlockingQueue.put("Message 1"); linkedBlockingQueue.put("Message 2"); linkedBlockingQueue.put("Message 3"); System.out.println(String.format("当前linkedBlockingQueue对象中的个数:【%s】,容量:【%s】", linkedBlockingQueue.size(),linkedBlockingQueue.remainingCapacity())); System.out.println("当前linkedBlockingQueue对象中poll()数据值为:" + linkedBlockingQueue.poll()); System.out.println("当前linkedBlockingQueue对象中poll()数据值为:" + linkedBlockingQueue.poll()); System.out.println(String.format("当前linkedBlockingQueue对象中的个数:【%s】,容量:【%s】", linkedBlockingQueue.size(),linkedBlockingQueue.remainingCapacity())); System.out.println("当前linkedBlockingQueue对象中poll()数据值为:" + linkedBlockingQueue.poll()); System.out.println("当前linkedBlockingQueue对象中poll()数据值为:" + linkedBlockingQueue.poll()); System.out.println("当前linkedBlockingQueue对象中poll()数据值为:" + linkedBlockingQueue.poll()); System.out.println("=====执行poll()签名方法-结束====="); } /** * 检索但不删除此队列的头,如果此队列为空,则返回 null。 执行结果: =====执行peek()签名方法-开始===== 当前linkedBlockingQueue对象中的个数:【3】,容量:【0】 当前linkedBlockingQueue对象中peek()数据值为:Message 1 当前linkedBlockingQueue对象中peek()数据值为:Message 1 当前linkedBlockingQueue对象中的个数:【3】,容量:【0】 当前linkedBlockingQueue对象中peek()数据值为:Message 1 当前linkedBlockingQueue对象中peek()数据值为:Message 1 当前linkedBlockingQueue对象中peek()数据值为:Message 1 =====执行peek()签名方法-结束===== */ public static void peek() throws InterruptedException { System.out.println("=====执行peek()签名方法-开始====="); LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3); linkedBlockingQueue.put("Message 1"); linkedBlockingQueue.put("Message 2"); linkedBlockingQueue.put("Message 3"); System.out.println(String.format("当前linkedBlockingQueue对象中的个数:【%s】,容量:【%s】", linkedBlockingQueue.size(),linkedBlockingQueue.remainingCapacity())); System.out.println("当前linkedBlockingQueue对象中peek()数据值为:" + linkedBlockingQueue.peek()); System.out.println("当前linkedBlockingQueue对象中peek()数据值为:" + linkedBlockingQueue.peek()); System.out.println(String.format("当前linkedBlockingQueue对象中的个数:【%s】,容量:【%s】", linkedBlockingQueue.size(),linkedBlockingQueue.remainingCapacity())); System.out.println("当前linkedBlockingQueue对象中peek()数据值为:" + linkedBlockingQueue.peek()); System.out.println("当前linkedBlockingQueue对象中peek()数据值为:" + linkedBlockingQueue.peek()); System.out.println("当前linkedBlockingQueue对象中peek()数据值为:" + linkedBlockingQueue.peek()); System.out.println("=====执行peek()签名方法-结束====="); } /** * 检索,但不删除,这个队列的头。 此方法与peek的不同之处在于,如果此队列为空,它将抛出异常。 执行结果: =====执行element()签名方法-开始===== 当前linkedBlockingQueue对象中的个数:【3】,容量:【0】 当前linkedBlockingQueue对象中的个数:【0】,容量:【3】 Exception in thread "main" java.util.NoSuchElementException at java.util.AbstractQueue.element(AbstractQueue.java:136) at com.yuanxw.thread.chapter27.LinkedBlockingQueueExample.element(LinkedBlockingQueueExample.java:181) at com.yuanxw.thread.chapter27.LinkedBlockingQueueExample.main(LinkedBlockingQueueExample.java:14) */ public static void element() throws InterruptedException { System.out.println("=====执行element()签名方法-开始====="); LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3); linkedBlockingQueue.put("Message 1"); linkedBlockingQueue.put("Message 2"); linkedBlockingQueue.put("Message 3"); System.out.println(String.format("当前linkedBlockingQueue对象中的个数:【%s】,容量:【%s】", linkedBlockingQueue.size(),linkedBlockingQueue.remainingCapacity())); linkedBlockingQueue.clear(); System.out.println(String.format("当前linkedBlockingQueue对象中的个数:【%s】,容量:【%s】", linkedBlockingQueue.size(),linkedBlockingQueue.remainingCapacity())); System.out.println("当前linkedBlockingQueue对象中element()数据值为:" + linkedBlockingQueue.element()); System.out.println("=====执行element()签名方法-结束====="); } /** * 检索并删除此队列的头。 此方法与poll不同之处在于,如果此队列为空,它将抛出异常。 执行结果: =====执行remove()签名方法-开始===== 当前linkedBlockingQueue对象中的个数:【3】,容量:【0】 当前linkedBlockingQueue对象中remove()数据值为:Message 1 当前linkedBlockingQueue对象中remove()数据值为:Message 2 当前linkedBlockingQueue对象中remove()数据值为:Message 3 Exception in thread "main" java.util.NoSuchElementException at java.util.AbstractQueue.remove(AbstractQueue.java:117) at com.yuanxw.thread.chapter27.LinkedBlockingQueueExample.remove(LinkedBlockingQueueExample.java:205) at com.yuanxw.thread.chapter27.LinkedBlockingQueueExample.main(LinkedBlockingQueueExample.java:15) */ public static void remove() throws InterruptedException { System.out.println("=====执行remove()签名方法-开始====="); LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3); linkedBlockingQueue.put("Message 1"); linkedBlockingQueue.put("Message 2"); linkedBlockingQueue.put("Message 3"); System.out.println(String.format("当前linkedBlockingQueue对象中的个数:【%s】,容量:【%s】", linkedBlockingQueue.size(),linkedBlockingQueue.remainingCapacity())); System.out.println("当前linkedBlockingQueue对象中remove()数据值为:" + linkedBlockingQueue.remove()); System.out.println("当前linkedBlockingQueue对象中remove()数据值为:" + linkedBlockingQueue.remove()); System.out.println("当前linkedBlockingQueue对象中remove()数据值为:" + linkedBlockingQueue.remove()); System.out.println("当前linkedBlockingQueue对象中remove()数据值为:" + linkedBlockingQueue.remove()); System.out.println(String.format("当前linkedBlockingQueue对象中的个数:【%s】,容量:【%s】", linkedBlockingQueue.size(),linkedBlockingQueue.remainingCapacity())); System.out.println("=====执行remove()签名方法-结束====="); } }
– 以上为《JAVA多线程(二十七)Java多线程之LinkedBlockingQueue容器》,如有不当之处请指出,我后续逐步完善更正,大家共同提高。谢谢大家对我的关注。
——厚积薄发(yuanxw)
最后
以上就是无辜衬衫最近收集整理的关于JAVA多线程(二十七)Java多线程之LinkedBlockingQueue容器的全部内容,更多相关JAVA多线程(二十七)Java多线程之LinkedBlockingQueue容器内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复