概述
一、为什么要使用并发容器:同步容器将所有对容器状态的访问都串行化了,这样保证了线程的安全性,所以这种方法的代价就是严重降低了并发性,当多个线程竞争容器时,吞吐量严重降低。因此Java5.0开始针对多线程并发访问设计,提供了并发性能较好的并发容器,引入了java.util.concurrent包。并发容器主要解决了两个问题:
- 根据具体场景进行设计,尽量避免synchronized,提供并发性。
- 定义了一些并发安全的复合操作,并且保证并发环境下的迭代操作不会出错。
util.concurrent中容器在迭代时,可以不封装在synchronized中,可以保证不抛异常,但是未必每次看到的都是”最新的、当前的”数据。
二、并发容器有哪些:
- CopyOnWriteArrayList ->ArrayList:
数据结构图
说明:
(1) CopyOnWriteArrayList实现了List接口,因此它是一个队列。
(2) CopyOnWriteArrayList包含了成员lock。每一个CopyOnWriteArrayList都和 一个互斥锁lock绑定,通过lock,实现了对CopyOnWriteArrayList的互斥访问。
(3) CopyOnWriteArrayList包含了成员array数组,这说明CopyOnWriteArrayList本质上通过数组实现的。
下面从“动态数组”和“线程安全”两个方面进一步对CopyOnWriteArrayList的原理进行说明。
CopyOnWriteArrayList的“动态数组”机制 -- 它内部有个“volatile数组”(array)来保持数据。在“添加/修改/删除”数据时,都会新建一个数组,并将更新后的数据拷贝到新建的数组中,最后再将该数组赋值给“volatile数组”。这就是它叫做CopyOnWriteArrayList的原因!CopyOnWriteArrayList就是通过这种方式实现的动态数组;不过正由于它在“添加/修改/删除”数据时,都会新建数组,所以涉及到修改数据的操作,CopyOnWriteArrayList效率很
低;但是单单只是进行遍历查找的话,效率比较高。
CopyOnWriteArrayList的“线程安全”机制 -- 是通过volatile和互斥锁来实现的。(01) CopyOnWriteArrayList是通过“volatile数组”来保存数据的。一个线程读取volatile数组时,总能看到其它线程对该volatile变量最后的写入;就这样,通过volatile提供了“读取到的数据总是最新的”这个机制的
保证。(02) CopyOnWriteArrayList通过互斥锁来保护数据。在“添加/修改/删除”数据时,会先“获取互斥锁”,再修改完毕之后,先将数据更新到“volatile数组”中,然后再“释放互斥锁”;这样,就达到了保护数据的目的。
缺点:
a. 写操作时复制消耗内存,如果元素比较多时候,容易导致young gc 和full gc。
b. 不能用于实时读的场景.由于复制和add操作等需要时间,故读取时可能读到旧值。 能做到最终一致性,但无法满足实时性的要求,更适合读多写少的场景。
c. 如果无法知道数组有多大,或者add,set操作有多少,慎用此类,在大量的复制副本的过程中很容易出错。
设计思想:
a. 读写分离
b. 最终一致性
c. 使用时另外开辟空间,防止并发冲突
源码分析:
构造方法
public CopyOnWriteArrayList(Collection<? extends E> c) {
Object[] elements;
if (c.getClass() == CopyOnWriteArrayList.class)
elements = ((CopyOnWriteArrayList<?>)c).getArray();
else {
elements = c.toArray();
// c.toArray might (incorrectly) not return Object[] (see 6260652)
if (elements.getClass() != Object[].class)
elements = Arrays.copyOf(elements, elements.length, Object[].class);
}
setArray(elements);
}
添加数据的方法
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
获取对象的方法
private E get(Object[] a, int index) {
return (E) a[index];
}
事例代码:
@ThreadSafe
public class CopyOnWriteArrayListExample {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static List<Integer> list = new CopyOnWriteArrayList<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", list.size());
}
private static void update(int i) {
list.add(i);
}
}
2. CopyOnWriteArraySet->HashSet:
(1) 它是线程安全的,底层实现使用的是CopyOnWriteArrayList,因此它也适用于大小很小的set集合,只读操作远大于可变操作。因为他需要copy整个数组,所以包括add、remove、set它的开销相对于大一些。
(2) 迭代器不支持可变的remove操作。使用迭代器遍历的时候速度很快,而且不会与其他线程发生冲突。
(3) 源码分析:
构造函数
public CopyOnWriteArraySet() {
al = new CopyOnWriteArrayList<E>();
}
添加元素的方法
private boolean addIfAbsent(E e, Object[] snapshot) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] current = getArray();
int len = current.length;
if (snapshot != current) {
// Optimize for lost race to another addXXX operation
int common = Math.min(snapshot.length, len);
for (int i = 0; i < common; i++)
if (current[i] != snapshot[i] && eq(e, current[i]))
return false;
if (indexOf(e, current, common, len) >= 0)
return false;
}
Object[] newElements = Arrays.copyOf(current, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
事例代码:
@ThreadSafe
public class CopyOnWriteArraySetExample {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static Set<Integer> set = new CopyOnWriteArraySet<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", set.size());
}
private static void update(int i) {
set.add(i);
}
}
3. ConcurrentSkipListSet–>TreeSet:
它是JDK6新增的类,同TreeSet一样支持自然排序,并且可以在构造的时候自己定义比较器。
(1) 同其他set集合,是基于map集合的(基于ConcurrentSkipListMap),在多线程环境下,里面的contains、add、remove操作都是线程安全的。
(2) 多个线程可以安全的并发的执行插入、移除、和访问操作。但是对于批量操作addAll、removeAll、retainAll和containsAll并不能保证以原子方式执行,原因是addAll、removeAll、retainAll底层调用的还是contains、add、remove方法,只能保证每一次的执行是原子性的,代表在单一执行操纵时不会被打断,但是不能保证每一次批量操作都不会被打断。在使用批量操作时,还是需要手动加上同步操作的。
(3) 不允许使用null元素的,它无法可靠的将参数及返回值与不存在的元素区分开来。
(4) 源码分析:
构造方法:
public ConcurrentSkipListSet() {
m = new ConcurrentSkipListMap<E,Object>();
//使用ConcurrentSkipListMap实现
}
事例代码:
@ThreadSafe
public class ConcurrentSkipListSetExample {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static Set<Integer> set = new ConcurrentSkipListSet<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", set.size());
}
private static void update(int i) {
set.add(i);
}
}
4. ConcurrentHashMap–>HashMap
(1) 不允许空值,在实际的应用中除了少数的插入操作和删除操作外,绝大多数我们使用map都是读取操作。而且读操作大多数都是成功的。基于这个前提,它针对读操作做了大量的优化。因此这个类在高并发环境下有特别好的表现。
(2) ConcurrentHashMap作为Concurrent一族,其有着高效地并发操作,相比Hashtable的笨重,ConcurrentHashMap则更胜一筹了。
(3) 在1.8版本以前,ConcurrentHashMap采用分段锁的概念,使锁更加细化,但是1.8已经改变了这种思路,而是利用CAS+Synchronized来保证并发更新的安全,当然底层采用数组+链表+红黑树的存储结构。
事例代码:
@ThreadSafe
public class ConcurrentHashMapExample {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static Map<Integer, Integer> map = new ConcurrentHashMap<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", map.size());
}
private static void update(int i) {
map.put(i, i);
}
}
5. ConcurrentSkipListMap–>TreeMap
(1) 底层实现采用SkipList跳表
(2) 曾经有人用ConcurrentHashMap与ConcurrentSkipListMap做性能测试,在4个线程1.6W的数据条件下,前者的数据存取速度 是后者的4倍左右。但是后者有几个前者不能比拟的优点:
a.Key是有序的
b.支持更高的并发,存储时间与线程数无关
事例代码:
@ThreadSafe
public class ConcurrentSkipListMapExample {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static Map<Integer, Integer> map = new ConcurrentSkipListMap<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e) {
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", map.size());
}
private static void update(int i) {
map.put(i, i);
}
}
6. 安全共享对象策略
(1) 线程限制:一个被线程限制的对象,由线程独占,并且只能被占有它的线程修改
(2) 共享只读:一个共享只读的对象,在没有额外同步的情况下,可以被多个线程并发访问,但是任何线程都不能修改它
(3) 线程安全对象:一个线程安全的对象或者容器,在内部通过同步机制来保障线程安全,所以其他线程无需额外的同步就可以通过公共接口随意访问他
(4) 被守护对象:被守护对象只能通过获取特定的锁来访问。
最后
以上就是直率帅哥为你收集整理的多线程理解(十) 并发容器的全部内容,希望文章能够帮你解决多线程理解(十) 并发容器所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复