概述
1. 前言
java.util.concurrent.atomic并发包提供了一些并发工具类,这里把它分成五类:
- 使用原子的方式更新基本类型
- AtomicInteger:整型原子类
- AtomicLong:长整型原子类
- AtomicBoolean :布尔型原子类
- 原子引用
- 原子数组
- 字段更新器
- 原子累加器
2. 原子整数
以 AtomicInteger 为例讨论它的api接口:通过观察源码可以发现,AtomicInteger 内部都是通过cas的原理来实现的!
2.1 相关api:
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(0);
// 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
System.out.println(i.getAndIncrement());
// 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
System.out.println(i.incrementAndGet());
// 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
System.out.println(i.decrementAndGet());
// 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
System.out.println(i.getAndDecrement());
// 获取并加值(i = 0, 结果 i = 5, 返回 0)
System.out.println(i.getAndAdd(5));
// 加值并获取(i = 5, 结果 i = 0, 返回 0)
System.out.println(i.addAndGet(-5));
// 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
// 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.getAndUpdate(p -> p - 2));
// 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
// 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.updateAndGet(p -> p + 2));
// 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
// 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
// getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
// getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
// 计算并获取(i = 10, p 为 i 的当前值, x 为参数1值, 结果 i = 0, 返回 0)
// 函数式编程接口,其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
}
2.2 AtomicInteger源码:
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
/*
* This class intended to be implemented using VarHandles, but there
* are unresolved cyclic startup dependencies.
*/
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");
private volatile int value;
Unsafe是CAS的核心类,Java无法直接访问底层操作系统,而是通过本地(native)方法来访问。不过尽管如此,JVM还是开了一个后门:Unsafe,它提供了硬件级别的原子操作。通过它来调用一些native方法。
VALUE 为变量值在内存中的偏移地址,unsafe就是通过偏移地址来得到数据的原值的。
value当前值,使用volatile修饰,保证多线程环境下看见的是同一个。
我们就以AtomicInteger的addAndGet()方法来做说明,看源代码:
2.3 addAndGet源码:
public final int addAndGet(int delta) {
return U.getAndAddInt(this, VALUE, delta) + delta;
}
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
// 自旋锁,直到能够更换为止
do {
// 获取内存地址中的值
v = getIntVolatile(o, offset);
} while (!weakCompareAndSetInt(o, offset, v, v + delta));
return v;
}
// 如果内存地址中的值offset和我传过来的值还是一样的
// 就将当前内存中的值赋为x
public final boolean weakCompareAndSetInt(Object o, long offset,
int expected,
int x) {
return compareAndSetInt(o, offset, expected, x);
}
// 四个参数,分别代表:对象、对象的地址、预期值、修改值
public final native boolean compareAndSetInt(Object o, long offset,
int expected,
int x);
3.原子引用
为什么需要原子引用类型?保证引用类型的共享变量是线程安全的(确保这个原子引用没有引用过别人)。
基本类型原子类只能更新一个变量,如果需要原子更新多个变量,需要使用引用类型原子类。
- AtomicReference:引用类型原子类
- AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
- AtomicMarkableReference :原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起来,也可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题。
使用原子引用实现BigDecimal存取款的线程安全
下面这个是不安全的实现过程:
class DecimalAccountUnsafe implements DecimalAccount {
BigDecimal balance;
public DecimalAccountUnsafe(BigDecimal balance) {
this.balance = balance;
}
@Override
public BigDecimal getBalance() {
return balance;
}
@Override
public void withdraw(BigDecimal amount) {
BigDecimal balance = this.getBalance();
this.balance = balance.subtract(amount);
}
}
解决代码如下:在AtomicReference类中,存在一个value类型的变量,保存对BigDecimal对象的引用。
class DecimalAccountCas implements DecimalAccount{
//private BigDecimal balance;
private AtomicReference<BigDecimal> balance ;
public DecimalAccountCas(BigDecimal balance) {
this.balance = new AtomicReference<>(balance);
}
@Override
public BigDecimal getBalance() {
return balance.get();
}
@Override
public void withdraw(BigDecimal amount) {
while(true){
BigDecimal pre = balance.get();
// 注意:这里的balance返回的是一个新的对象,即 pre!=next
BigDecimal next = pre.subtract(amount);
if (balance.compareAndSet(pre,next)){
break;
}
}
}
}
3.1 ABA问题:
如下程序所示,虽然在other方法中存在两个线程对共享变量进行了修改,但是修改之后又变成了原值,main线程中对此是不可见得,这种操作对业务代码并无影响:
static AtomicReference<String> ref = new AtomicReference<>("A");
public static void main(String[] args) throws InterruptedException {
log.debug("main start");
// 获取值A
String prev = ref.get();
other();
Thread.sleep(1000);
log.debug("A -> C {}", ref.compareAndSet(prev, "C"));
}
private static void other() throws InterruptedException {
new Thread(() -> {
log.debug("A -> B {}", ref.compareAndSet(ref.get(), "B"));
}, "t1").start();
Thread.sleep(500);
new Thread(() -> {
// 注意:如果这里使用 log.debug("change B->A {}", ref.compareAndSet(ref.get(), new String("A")));
// 那么此实验中的 log.debug("change A->C {}", ref.compareAndSet(prev, "C"));
// 打印的就是false, 因为new String("A") 返回的对象的引用和"A"返回的对象的引用是不同的!
log.debug("B -> A {}", ref.compareAndSet(ref.get(), "A"));
}, "t2").start();
}
结果:
13:49:06.903 [main] DEBUG aba - main start
13:49:06.909 [t1] DEBUG aba - A -> B true
13:49:07.415 [t2] DEBUG aba - B -> A true
13:49:08.425 [main] DEBUG aba - A -> C true
主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又改回 A 的情况,如果主线程希望:只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号。使用AtomicStampedReference来解决。
3.2 ABA问题解决:AtomicStampedReference
Java提供了AtomicStampedReference来解决。AtomicStampedReference通过包装[E,Integer]的元组来对对象标记版本戳stamp,从而避免ABA问题。
AtomicStampedReference的compareAndSet()方法定义如下:
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
compareAndSet有四个参数,分别表示:预期引用、更新后的引用、预期标志、更新后的标志。
如果更新后的引用和标志和当前的引用和标志相等则直接返回true,否则通过Pair生成一个新的pair对象与当前pair CAS替换。Pair为AtomicStampedReference的内部类,主要用于记录引用和版本戳信息(标识),定义如下:
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
Pair记录着对象的引用和版本戳,版本戳为int型,保持自增。同时Pair是一个不可变对象,其所有属性全部定义为final,对外提供一个of方法,该方法返回一个新建的Pari对象。pair对象定义为volatile,保证多线程环境下的可见性。在AtomicStampedReference中,大多方法都是通过调用Pair的of方法来产生一个新的Pair对象,然后赋值给变量pair。如set方法:
public void set(V newReference, int newStamp) {
Pair<V> current = pair;
if (newReference != current.reference || newStamp != current.stamp)
this.pair = Pair.of(newReference, newStamp);
}
3.2.1 使用AtomicStampedReference解决aba问题:
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A",0);
public static void main(String[] args) throws InterruptedException {
log.debug("main start...");
// 获取值 A
int stamp = ref.getStamp();
log.info("main stamp:{}",stamp);
String prev = ref.getReference();
other();
Thread.sleep(1000);
// 尝试改为 C
log.debug("change A->C {}", ref.compareAndSet(prev, "C",stamp,stamp+1));
}
private static void other() throws InterruptedException {
new Thread(() -> {
int stamp = ref.getStamp();
log.info("t1 stamp:{}",stamp);
log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B",stamp,stamp+1));
}, "t1").start();
Thread.sleep(500);
new Thread(() -> {
int stamp = ref.getStamp();
log.info("t2 stamp:{}",stamp);
log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A",stamp,stamp+1));
}, "t2").start();
}
结果:
14:13:48.082 [main] DEBUG aba - main start...
14:13:48.086 [main] INFO aba - main stamp:0
14:13:48.089 [t1] INFO aba - t1 stamp:0
14:13:48.090 [t1] DEBUG aba - change A->B true
14:13:48.603 [t2] INFO aba - t2 stamp:1
14:13:48.603 [t2] DEBUG aba - change B->A true
14:13:49.617 [main] DEBUG aba - change A->C false
3.2.3 compare比较的是地址:
同时,从上面compareAndSet的源码我们看到compare比较的比较方式都是用的双等号,即比较的是引用的地址,如果是使用Integer类型,1000==1000
的判断是false, 对于其他的对象,只有同一个new出来的对象才是相等的。
代码示例:
static AtomicStampedReference<Integer> ref = new AtomicStampedReference<>(200,0);
public static void main(String[] args) throws InterruptedException {
log.debug("main start...");
// 获取值 A
int stamp = ref.getStamp();
log.info("main stamp:{}",stamp);
Integer prev = ref.getReference();
other();
Thread.sleep(1000);
// 尝试改为 C
log.debug("change A->C {}", ref.compareAndSet(prev, 400,stamp,stamp+1));
}
private static void other() throws InterruptedException {
new Thread(() -> {
int stamp = ref.getStamp();
log.info("t1 stamp:{}",stamp);
log.debug("change A->B {}", ref.compareAndSet(200, 300,stamp,stamp+1));
}, "t1").start();
Thread.sleep(500);
new Thread(() -> {
int stamp = ref.getStamp();
log.info("t2 stamp:{}",stamp);
log.debug("change B->A {}", ref.compareAndSet(300, 200,stamp,stamp+1));
}, "t2").start();
}
解决方式:
// 比较时获取,或者值小于128
log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), 300,stamp,stamp+1));
3.3 AtomicMarkableReference
AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如:A -> B -> A ->C,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。
但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference
源码:
public boolean compareAndSet(V expectedReference,
V newReference,
boolean expectedMark,
boolean newMark) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedMark == current.mark &&
((newReference == current.reference &&
newMark == current.mark) ||
casPair(current, Pair.of(newReference, newMark)));
}
通过AtomicMarkableReference的源码我们可以看到,他的标识是布尔类型的,也就是说,我们不需要关心更新了几次,我们只关心有没有更新。
4. 原子数组
使用原子的方式更新数组里的某个元素,主要包含三个:
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
关于函数式编程请看:四大函数式接口
代码演示:
创建一个长度为10的数组,然后创建10个线程,每个线程循环10000次,分别对数组下标0~9的元素自行自增操作,最后查看数组
public class TestAtomicArray {
public static void main(String[] args) {
TestAtomicArray.demo(
()->new AtomicIntegerArray(10),
(array)-> array.length(),
(array,index)-> array.getAndIncrement(index),
(array)->System.out.println(array)
);
TestAtomicArray.demo(
()->new int[10],
(array)-> array.length,
(array,index)-> array[index]++,
(array)->System.out.println(Arrays.toString(array))
);
}
/**
参数1,提供数组、可以是线程不安全数组或线程安全数组
参数2,获取数组长度的方法
参数3,自增方法,传 array, index两个参数 array为数组,index为数组元素每次自增的元素的下标
参数4,打印数组的方法
*/
// supplier 提供者 无中生有 ()->结果
// function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果
// consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)->
public static <T> void demo(
Supplier<T> arraySupplier,
Function<T,Integer> lengthFun,
BiConsumer<T,Integer> putConsumer,
Consumer<T> printConsumer){
List<Thread> ts = new ArrayList<>();
T array = arraySupplier.get();
Integer length = lengthFun.apply(array);
for (int i = 0;i<length;i++){
ts.add(new Thread(()->{
for (int j=0;j<10000;j++){
putConsumer.accept(array,j%length);
}
}));
}
ts.forEach(t->t.start());
ts.forEach(t->{
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
printConsumer.accept(array);
}
}
结果:
[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
[6857, 6840, 6815, 6802, 6825, 6847, 6845, 6850, 6824, 6768]
可以看到使用atomic初始化的数组,正确地输出了结果
5.字段更新器
使用原子的方式更新对象中的属性,主要包括三个类:
- AtomicReferenceFieldUpdater
- AtomicIntegerFieldUpdater // 字段类型是integer
- AtomicLongFieldUpdater // 字段类型是long
注意,属性必须用volatile来修饰
字段更新器使用:
public class TestAtomicField {
public static void main(String[] args) {
Student student = new Student();
AtomicReferenceFieldUpdater u = AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");
System.out.println(u.compareAndSet(student, null, "张三"));
}
}
@Data
class Student {
// 注意需要用volatile修饰保证可见性
volatile String name;
}
结果:
true
6.原子累加器
顾名思义,原子累加器,就是对数字进行累加操作;
在jdk8之后,jdk专门给我们新增了几个用于累加的类:
代码展示:
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
demo(() -> new LongAdder(), adder -> adder.increment());
}
for (int i = 0; i < 5; i++) {
demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());
}
}
private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {
T adder = adderSupplier.get();
long start = System.nanoTime();
List<Thread> ts = new ArrayList<>();
// 4个线程,每人累加 50 万
for (int i = 0; i < 4; i++) {
ts.add(new Thread(() -> {
for (int j = 0; j < 500000; j++) {
action.accept(adder);
}
}));
}
ts.forEach(t -> t.start());
ts.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.nanoTime();
System.out.println(adder + " cost:" + (end - start)/1000_000);
}
结果:
20000000 cost:91
20000000 cost:72
20000000 cost:43
20000000 cost:45
20000000 cost:51
20000000 cost:558
20000000 cost:551
20000000 cost:579
20000000 cost:514
20000000 cost:503
可以看出,LongAdder的效率几乎是十倍于AtomicInteger的自增方法。
性能提升的原因很简单,就是在有竞争时,设置多个累加单元(但不会超过cpu的核心数),Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1],Thread-2累加Cell[0]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。
最后
以上就是能干马里奥为你收集整理的多线程(十五) -- 无锁(二) -- atomic相关原子类1. 前言2. 原子整数3.原子引用4. 原子数组5.字段更新器6.原子累加器的全部内容,希望文章能够帮你解决多线程(十五) -- 无锁(二) -- atomic相关原子类1. 前言2. 原子整数3.原子引用4. 原子数组5.字段更新器6.原子累加器所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复