1. 前言
java.util.concurrent.atomic并发包提供了一些并发工具类,这里把它分成五类:
- 使用原子的方式更新基本类型
- AtomicInteger:整型原子类
- AtomicLong:长整型原子类
- AtomicBoolean :布尔型原子类
- 原子引用
- 原子数组
- 字段更新器
- 原子累加器
2. 原子整数
以 AtomicInteger 为例讨论它的api接口:通过观察源码可以发现,AtomicInteger 内部都是通过cas的原理来实现的!
2.1 相关api:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30public static void main(String[] args) { AtomicInteger i = new AtomicInteger(0); // 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++ System.out.println(i.getAndIncrement()); // 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i System.out.println(i.incrementAndGet()); // 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i System.out.println(i.decrementAndGet()); // 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i-- System.out.println(i.getAndDecrement()); // 获取并加值(i = 0, 结果 i = 5, 返回 0) System.out.println(i.getAndAdd(5)); // 加值并获取(i = 5, 结果 i = 0, 返回 0) System.out.println(i.addAndGet(-5)); // 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0) // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用 System.out.println(i.getAndUpdate(p -> p - 2)); // 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0) // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用 System.out.println(i.updateAndGet(p -> p + 2)); // 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0) // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用 // getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的 // getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final System.out.println(i.getAndAccumulate(10, (p, x) -> p + x)); // 计算并获取(i = 10, p 为 i 的当前值, x 为参数1值, 结果 i = 0, 返回 0) // 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用 System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x)); }
2.2 AtomicInteger源码:
1
2
3
4
5
6
7
8
9
10
11
12public class AtomicInteger extends Number implements java.io.Serializable { private static final long serialVersionUID = 6214790243416807050L; /* * This class intended to be implemented using VarHandles, but there * are unresolved cyclic startup dependencies. */ private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe(); private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value"); private volatile int value;
Unsafe是CAS的核心类,Java无法直接访问底层操作系统,而是通过本地(native)方法来访问。不过尽管如此,JVM还是开了一个后门:Unsafe,它提供了硬件级别的原子操作。通过它来调用一些native方法。
VALUE 为变量值在内存中的偏移地址,unsafe就是通过偏移地址来得到数据的原值的。
value当前值,使用volatile修饰,保证多线程环境下看见的是同一个。
我们就以AtomicInteger的addAndGet()方法来做说明,看源代码:
2.3 addAndGet源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public final int addAndGet(int delta) { return U.getAndAddInt(this, VALUE, delta) + delta; } public final int getAndAddInt(Object o, long offset, int delta) { int v; // 自旋锁,直到能够更换为止 do { // 获取内存地址中的值 v = getIntVolatile(o, offset); } while (!weakCompareAndSetInt(o, offset, v, v + delta)); return v; } // 如果内存地址中的值offset和我传过来的值还是一样的 // 就将当前内存中的值赋为x public final boolean weakCompareAndSetInt(Object o, long offset, int expected, int x) { return compareAndSetInt(o, offset, expected, x); } // 四个参数,分别代表:对象、对象的地址、预期值、修改值 public final native boolean compareAndSetInt(Object o, long offset, int expected, int x);
3.原子引用
为什么需要原子引用类型?保证引用类型的共享变量是线程安全的(确保这个原子引用没有引用过别人)。
基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用引用类型原子类。
- AtomicReference:引用类型原子类
- AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
- AtomicMarkableReference :原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起来,也可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
使用原子引用实现BigDecimal存取款的线程安全
下面这个是不安全的实现过程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16class DecimalAccountUnsafe implements DecimalAccount { BigDecimal balance; public DecimalAccountUnsafe(BigDecimal balance) { this.balance = balance; } @Override public BigDecimal getBalance() { return balance; } @Override public void withdraw(BigDecimal amount) { BigDecimal balance = this.getBalance(); this.balance = balance.subtract(amount); } }
解决代码如下:在AtomicReference类中,存在一个value类型的变量,保存对BigDecimal对象的引用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27class DecimalAccountCas implements DecimalAccount{ //private BigDecimal balance; private AtomicReference<BigDecimal> balance ; public DecimalAccountCas(BigDecimal balance) { this.balance = new AtomicReference<>(balance); } @Override public BigDecimal getBalance() { return balance.get(); } @Override public void withdraw(BigDecimal amount) { while(true){ BigDecimal pre = balance.get(); // 注意:这里的balance返回的是一个新的对象,即 pre!=next BigDecimal next = pre.subtract(amount); if (balance.compareAndSet(pre,next)){ break; } } } }
3.1 ABA问题:
如下程序所示,虽然在other方法中存在两个线程对共享变量进行了修改,但是修改之后又变成了原值,main线程中对此是不可见得,这种操作对业务代码并无影响:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24static AtomicReference<String> ref = new AtomicReference<>("A"); public static void main(String[] args) throws InterruptedException { log.debug("main start"); // 获取值A String prev = ref.get(); other(); Thread.sleep(1000); log.debug("A -> C {}", ref.compareAndSet(prev, "C")); } private static void other() throws InterruptedException { new Thread(() -> { log.debug("A -> B {}", ref.compareAndSet(ref.get(), "B")); }, "t1").start(); Thread.sleep(500); new Thread(() -> { // 注意:如果这里使用 log.debug("change B->A {}", ref.compareAndSet(ref.get(), new String("A"))); // 那么此实验中的 log.debug("change A->C {}", ref.compareAndSet(prev, "C")); // 打印的就是false, 因为new String("A") 返回的对象的引用和"A"返回的对象的引用是不同的! log.debug("B -> A {}", ref.compareAndSet(ref.get(), "A")); }, "t2").start(); }
结果:
1
2
3
4
513:49:06.903 [main] DEBUG aba - main start 13:49:06.909 [t1] DEBUG aba - A -> B true 13:49:07.415 [t2] DEBUG aba - B -> A true 13:49:08.425 [main] DEBUG aba - A -> C true
主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又改回 A 的情况,如果主线程希望:只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号。使用AtomicStampedReference来解决。
3.2 ABA问题解决:AtomicStampedReference
Java提供了AtomicStampedReference来解决。AtomicStampedReference通过包装[E,Integer]的元组来对对象标记版本戳stamp,从而避免ABA问题。
AtomicStampedReference的compareAndSet()方法定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Pair<V> current = pair; return expectedReference == current.reference && expectedStamp == current.stamp && ((newReference == current.reference && newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp))); }
compareAndSet有四个参数,分别表示:预期引用、更新后的引用、预期标志、更新后的标志。
如果更新后的引用和标志和当前的引用和标志相等则直接返回true,否则通过Pair生成一个新的pair对象与当前pair CAS替换。Pair为AtomicStampedReference的内部类,主要用于记录引用和版本戳信息(标识),定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14private static class Pair<T> { final T reference; final int stamp; private Pair(T reference, int stamp) { this.reference = reference; this.stamp = stamp; } static <T> Pair<T> of(T reference, int stamp) { return new Pair<T>(reference, stamp); } } private volatile Pair<V> pair;
Pair记录着对象的引用和版本戳,版本戳为int型,保持自增。同时Pair是一个不可变对象,其所有属性全部定义为final,对外提供一个of方法,该方法返回一个新建的Pari对象。pair对象定义为volatile,保证多线程环境下的可见性。在AtomicStampedReference中,大多方法都是通过调用Pair的of方法来产生一个新的Pair对象,然后赋值给变量pair。如set方法:
1
2
3
4
5
6public void set(V newReference, int newStamp) { Pair<V> current = pair; if (newReference != current.reference || newStamp != current.stamp) this.pair = Pair.of(newReference, newStamp); }
3.2.1 使用AtomicStampedReference解决aba问题:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A",0); public static void main(String[] args) throws InterruptedException { log.debug("main start..."); // 获取值 A int stamp = ref.getStamp(); log.info("main stamp:{}",stamp); String prev = ref.getReference(); other(); Thread.sleep(1000); // 尝试改为 C log.debug("change A->C {}", ref.compareAndSet(prev, "C",stamp,stamp+1)); } private static void other() throws InterruptedException { new Thread(() -> { int stamp = ref.getStamp(); log.info("t1 stamp:{}",stamp); log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B",stamp,stamp+1)); }, "t1").start(); Thread.sleep(500); new Thread(() -> { int stamp = ref.getStamp(); log.info("t2 stamp:{}",stamp); log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A",stamp,stamp+1)); }, "t2").start(); }
结果:
1
2
3
4
5
6
7
814:13:48.082 [main] DEBUG aba - main start... 14:13:48.086 [main] INFO aba - main stamp:0 14:13:48.089 [t1] INFO aba - t1 stamp:0 14:13:48.090 [t1] DEBUG aba - change A->B true 14:13:48.603 [t2] INFO aba - t2 stamp:1 14:13:48.603 [t2] DEBUG aba - change B->A true 14:13:49.617 [main] DEBUG aba - change A->C false
3.2.3 compare比较的是地址:
同时,从上面compareAndSet的源码我们看到compare比较的比较方式都是用的双等号,即比较的是引用的地址,如果是使用Integer类型,1000==1000
的判断是false, 对于其他的对象,只有同一个new出来的对象才是相等的。
代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27static AtomicStampedReference<Integer> ref = new AtomicStampedReference<>(200,0); public static void main(String[] args) throws InterruptedException { log.debug("main start..."); // 获取值 A int stamp = ref.getStamp(); log.info("main stamp:{}",stamp); Integer prev = ref.getReference(); other(); Thread.sleep(1000); // 尝试改为 C log.debug("change A->C {}", ref.compareAndSet(prev, 400,stamp,stamp+1)); } private static void other() throws InterruptedException { new Thread(() -> { int stamp = ref.getStamp(); log.info("t1 stamp:{}",stamp); log.debug("change A->B {}", ref.compareAndSet(200, 300,stamp,stamp+1)); }, "t1").start(); Thread.sleep(500); new Thread(() -> { int stamp = ref.getStamp(); log.info("t2 stamp:{}",stamp); log.debug("change B->A {}", ref.compareAndSet(300, 200,stamp,stamp+1)); }, "t2").start(); }
解决方式:
1
2
3// 比较时获取,或者值小于128 log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), 300,stamp,stamp+1));
3.3 AtomicMarkableReference
AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如:A -> B -> A ->C,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。
但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference
源码:
1
2
3
4
5
6
7
8
9
10
11
12
13public boolean compareAndSet(V expectedReference, V newReference, boolean expectedMark, boolean newMark) { Pair<V> current = pair; return expectedReference == current.reference && expectedMark == current.mark && ((newReference == current.reference && newMark == current.mark) || casPair(current, Pair.of(newReference, newMark))); }
通过AtomicMarkableReference的源码我们可以看到,他的标识是布尔类型的,也就是说,我们不需要关心更新了几次,我们只关心有没有更新。
4. 原子数组
使用原子的方式更新数组里的某个元素,主要包含三个:
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
关于函数式编程请看:四大函数式接口
代码演示:
创建一个长度为10的数组,然后创建10个线程,每个线程循环10000次,分别对数组下标0~9的元素自行自增操作,最后查看数组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53public class TestAtomicArray { public static void main(String[] args) { TestAtomicArray.demo( ()->new AtomicIntegerArray(10), (array)-> array.length(), (array,index)-> array.getAndIncrement(index), (array)->System.out.println(array) ); TestAtomicArray.demo( ()->new int[10], (array)-> array.length, (array,index)-> array[index]++, (array)->System.out.println(Arrays.toString(array)) ); } /** 参数1,提供数组、可以是线程不安全数组或线程安全数组 参数2,获取数组长度的方法 参数3,自增方法,传 array, index两个参数 array为数组,index为数组元素每次自增的元素的下标 参数4,打印数组的方法 */ // supplier 提供者 无中生有 ()->结果 // function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果 // consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)-> public static <T> void demo( Supplier<T> arraySupplier, Function<T,Integer> lengthFun, BiConsumer<T,Integer> putConsumer, Consumer<T> printConsumer){ List<Thread> ts = new ArrayList<>(); T array = arraySupplier.get(); Integer length = lengthFun.apply(array); for (int i = 0;i<length;i++){ ts.add(new Thread(()->{ for (int j=0;j<10000;j++){ putConsumer.accept(array,j%length); } })); } ts.forEach(t->t.start()); ts.forEach(t->{ try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); printConsumer.accept(array); } }
结果:
1
2
3[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000] [6857, 6840, 6815, 6802, 6825, 6847, 6845, 6850, 6824, 6768]
可以看到使用atomic初始化的数组,正确地输出了结果
5.字段更新器
使用原子的方式更新对象中的属性,主要包括三个类:
- AtomicReferenceFieldUpdater
- AtomicIntegerFieldUpdater // 字段类型是integer
- AtomicLongFieldUpdater // 字段类型是long
注意,属性必须用volatile来修饰
字段更新器使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14public class TestAtomicField { public static void main(String[] args) { Student student = new Student(); AtomicReferenceFieldUpdater u = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name"); System.out.println(u.compareAndSet(student, null, "张三")); } } @Data class Student { // 注意需要用volatile修饰保证可见性 volatile String name; }
结果:
1
2true
6.原子累加器
顾名思义,原子累加器,就是对数字进行累加操作;
在jdk8之后,jdk专门给我们新增了几个用于累加的类:
代码展示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34public static void main(String[] args) { for (int i = 0; i < 5; i++) { demo(() -> new LongAdder(), adder -> adder.increment()); } for (int i = 0; i < 5; i++) { demo(() -> new AtomicLong(), adder -> adder.getAndIncrement()); } } private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) { T adder = adderSupplier.get(); long start = System.nanoTime(); List<Thread> ts = new ArrayList<>(); // 4个线程,每人累加 50 万 for (int i = 0; i < 4; i++) { ts.add(new Thread(() -> { for (int j = 0; j < 500000; j++) { action.accept(adder); } })); } ts.forEach(t -> t.start()); ts.forEach(t -> { try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.nanoTime(); System.out.println(adder + " cost:" + (end - start)/1000_000); }
结果:
1
2
3
4
5
6
7
8
9
10
1120000000 cost:91 20000000 cost:72 20000000 cost:43 20000000 cost:45 20000000 cost:51 20000000 cost:558 20000000 cost:551 20000000 cost:579 20000000 cost:514 20000000 cost:503
可以看出,LongAdder的效率几乎是十倍于AtomicInteger的自增方法。
性能提升的原因很简单,就是在有竞争时,设置多个累加单元(但不会超过cpu的核心数),Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1],Thread-2累加Cell[0]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。
最后
以上就是能干马里奥最近收集整理的关于多线程(十五) -- 无锁(二) -- atomic相关原子类1. 前言2. 原子整数3.原子引用4. 原子数组5.字段更新器6.原子累加器的全部内容,更多相关多线程(十五)内容请搜索靠谱客的其他文章。
发表评论 取消回复