我是靠谱客的博主 现实饼干,最近开发中收集的这篇文章主要介绍JUC之 atomic——核心CAS 原子操作atomicCAS的缺点:,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

 

 

 

因为Atomic系列的原子类,无论在并发编程、JDK源码、还是各种开源项目中,都经常用到。而且在Java并发面试中,这一块也属于比较高频的考点,所以还是值得给大家聊一聊。

 

 

二、场景引入,问题凸现

比如说下面这段代码: 

import java.util.concurrent.CountDownLatch;

public class VolatileTest {
    
    public static volatile int race = 0;
 
    private static final int THREADS_COUNT = 20;
 
    private static CountDownLatch countDownLatch = new CountDownLatch(THREADS_COUNT);
 
    public static void increase() {
        race++;
    }
 
    public static void main(String[] args) throws InterruptedException {
        Thread[] threads = new Thread[THREADS_COUNT];
        for (int i = 0; i < THREADS_COUNT; i++) {
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < 10000; i++) {
                        increase();
                    }
                    countDownLatch.countDown();
                }
            });
            threads[i].start();
        }
        countDownLatch.await();
        System.out.println(race);
    }
}

 

运行完这段代码之后,并不会获得期望的结果,而且会发现每次运行程序,输出的结果都不一样,都是一个小于200000的数字。

这是因为volatile只能保证可见性,无法保证原子性,而自增操作并不是一个原子操作

 

三、初步的解决方案:synchronized

 

解决方法:

首先我们想到的是用synchronized来修饰increase方法。

用synchronized来修饰increase方法。

 

这个时候,代码就是线程安全的了,因为我们加了synchronized,也就是让每个线程要进入increase()方法之前先得尝试加锁,同一时间只有一个线程能加锁,其他线程需要等待锁。

 

通过这样处理,不会出现数据错乱的问题。

 

 

但是,如此简单的data++操作,都要加一个重的synchronized锁来解决多线程并发问题,就有点杀鸡用牛刀,大材小用了。

 

虽然随着Java版本更新,也对synchronized做了很多优化,但是处理这种简单的累减操作,仍然显得“太重了”。

 

而且,在这个场景下,你要是用synchronized,不就相当于让各个线程串行化了么?一个接一个的排队,加锁,处理数据,释放锁,下一个再进来。

 

使用synchronized修饰后,increase方法变成了一个原子操作,因此是肯定能得到正确的结果。但是,我们知道,每次自增都进行加锁,性能可能会稍微差了点,有更好的方案吗?

 

答案当然是有的,这个时候我们可以使用Java并发包原子操作类(Atomic开头)

 

四、更高效的方案:Atomic原子类及其底层原理

 

对于这种简单的data--或者data++类的操作,其实我们完全可以换一种做法,java并发包下面提供了一系列的Atomic原子类,比如说AtomicInteger。

 

我们将例子中的代码稍做修改:race改成使用AtomicInteger定义,“race++”改成使用“race.getAndIncrement()”,AtomicInteger.getAndIncrement()是原子操作,因此我们可以确保每次都可以获得正确的结果,并且在性能上有不错的提升
 

 

他可以保证多线程并发安全的情况下,高性能的并发更新一个数值。我们来看下面的代码:

public class AtomicIntegerTest {
    static AtomicInteger i = new AtomicInteger();

    public static class AddThread implements Runnable {
        @Override
        public void run() {
            for (int k = 0; k < 10000; k++) {
                i.incrementAndGet();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread[] ts = new Thread[10];
        for (int k = 0; k < 10; k++) {
            ts[k] = new Thread(new AddThread());
        }
        for (int k = 0; k < 10; k++) {
            ts[k].start();
        }
       
        System.out.println(i);
    }
}

 

大家看上面的代码,是不是很简单!多个线程可以并发的执行AtomicInteger的incrementAndGet()方法,意思就是给我把值累加1,接着返回累加后最新的值。

 

 

 

实际上,Atomic原子类底层用的不是传统意义的锁机制,而是无锁化的CAS机制,通过CAS机制保证多线程修改一个数值的安全性

 

什么是CAS ?

compareAndSet,它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作。
CAS必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果。
volatile修饰的共享变量,会保证每次读操作都会从主内存中获取最新值。在CAS操作中,最新值与CAS代码中的获取值不一致的时候,会重新获取最新值并再次比较。当一致的时候,就会进行数值的修改,以此来保证数据的安全性。

CAS 的特点

结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。
CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我再重试。
synchronized是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。

CAS 体现的是无锁并发、无阻塞并发

1、没有使用 synchronized,所以线程不会陷入阻塞,这是效率提升的因素之一
2、但如果竞争激烈,可以想到重试必然频繁发生,反而效率会受影响
 

乐观锁与悲观所的区别在于 -- 乐观锁趋向于不加锁来处理资源比如给记录加入version这种方法 记录版本号 他将内存地址的内容和定值相比较 只有在相同的情况下,才会更新期望值如果已经被另一个线程操作更新则更新失败 ! 会返回boolean来判断

CAS的底层原理 ??

整体流程

 

底层大部分都是有unsafe完成,unsafe自己属于JDK-- sun包下的,在我查看源码发现unsafe是 是CAS的核心类 由于Java 方法无法直接访问底层 ,需要通过本地(native)方法来访问,UnSafe相当于一个后门,基于该类可以直接操作特定的内存数据. UnSafe类在于sun.misc包中,其内部方法操作可以向C的指针一样直接操作内存,因为Java中CAS操作的执行依赖于UNSafe类的方法.

**注意UnSafe类中所有的方法都是native修饰的,也就是说UnSafe类中的方法都是直接调用操作底层资源执行响应的任务**

整体流程

valueoff是该变量在内存中的偏移地址 ,unsafe就是根据内存偏移地址来获取数据的!

变量value 被volatile修饰 保证了多线程的可见性

在多处理器情况下必须使用lock指令加锁来完成。从这个例子就可以比较清晰的了解CAS的底层实现了, 当然不同的操作系统和处理器的实现会有所不同,大家可以自行了解。 计算机并发原语是叫这个吧哈哈!!

整体流程

此处为jdk实现

   public final int getAndAddInt(Object var1, long var2, int var4) {
          int var5;
          do {
              //兄弟们告诉我var1 和var2 定位到的内存地址的值是多少
              var5 = this.getIntVolatile(var1, var2);
          } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
  
          return var5;
      }
      
      1.var1 就是 AtomicInteger new 的本身
      var2 该对象的引用地址 就是偏移量 valueoff
      var4 需要变动的数量
      var5是通过var1和var2找到的主内存中的真实值
      用该对象当前的值与var5比较 如果相同 更新var5+var4并返回true 如果不同继续循环比较直至更新完成!

假设线程A和线程B两个线程同时执行getAndAddInt操作(分别在不同的CPU上):

1.AtomicInteger里面的value原始值为3,即主内存中AtomicInteger的value为3,根据JMM模型,线程A和线程B各自持有一份值为3的value的副本分别到各自的工作内存.

2.线程A通过getIntVolatile(var1,var2) 拿到value值3,这是线程A被挂起.

3.线程B也通过getIntVolatile(var1,var2) 拿到value值3,此时刚好线程B没有被挂起并执行compareAndSwapInt方法比较内存中的值也是3 成功修改内存的值为4 线程B打完收工 一切OK.

4.这是线程A恢复,执行compareAndSwapInt方法比较,发现自己手里的数值和内存中的数字4不一致,说明该值已经被其他线程抢先一步修改了,那A线程修改失败,只能重新来一遍了.

5.线程A重新获取value值,因为变量value是volatile修饰,所以其他线程对他的修改,线程A总是能够看到,线程A继续执行compareAndSwapInt方法进行比较替换,直到成功.

  CAS并发原语提现在Java语言中就是sun.miscUnSaffe类中的各个方法.调用UnSafe类中的CAS方法,
  JVM会帮我实现CAS汇编指令.这是一种完全依赖于硬件 功能,通过它实现了原子操作,再次强调,
  由于CAS是一种系统原语,原语属于操作系统用于范畴,是由若干条指令组成,
  用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许中断,也即是说CAS是一条原子指令
  ,不会造成所谓的数据不一致的问题.

atomic底下常用的类

  • AtomicBoolean
  • AtomicInteger
  • AtomicIntegerArray
  • AtomicIntegerFieldUpdater
  • AtomicLong
  • AtomicLongArray
  • AtomicLongFieldUpdater
  • AtomicMarkableReference
  • AtomicReference
  • AtomicReferenceArray
  • AtomicReferenceFieldUpdater
  • AtomicStampedReference
  • DoubleAccumulator
  • DoubleAdder
  • LongAccumulator
  • LongAdder

原子整数(整数)

AtomicBoolean
AtomicInteger
AtomicLong

常用方法

getAndIncrement() 类似于 i++
incrementAndGet() 类似于 ++i
decrementAndGet() 类似于–i
getAndDecrement() 类似于 i–
getAndAdd(5) 先获取i再加5
addAndGet(-5) 先加5再获取i
getAndUpdate(p -> p * 2) 进行乘除操作
.updateAndGet(p -> p * 2) 进行乘除操作


原子引用(小数、字符串。。)


AtomicReference
AtomicMarkableReference
AtomicStampedReference
AtomicReference<BigDecimal> bigdecimal = new AtomicReference<>(new BigDecimal("10.1"));
AtomicReference<String> str = new AtomicReference<>("A");
 

原子数组


AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
保护数组的元素

字段更新器和原子累加器
字段更新器

AtomicReferenceFieldUpdater // 域 字段
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater

利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常
 

public class Test{

    public static void main(String[] args) {
        Student stu = new Student();

        AtomicReferenceFieldUpdater updater =
                AtomicReferenceFieldUpdater.newUpdater(Student.class, String.class, "name");

        System.out.println(updater.compareAndSet(stu, null, "张三"));
        System.out.println(stu);
    }
}

class Student {
    volatile String name;

    @Override
    public String toString() {
        return "Student{" +
                "name='" + name + ''' +
                '}';
    }
}

 

原子累加器

LongAdder
LongAdder性能比AtomicLong好
性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。
 

CAS的缺点:

CAS虽然很高效的解决了原子操作问题,但是CAS仍然存在三大问题。

  1. 循环时间长开销很大。
  2. 只能保证一个变量的原子操作。
  3. ABA问题。

 

循环时间长开销很大:

CAS 通常是配合无限循环一起使用的,我们可以看到 getAndAddInt 方法执行时,如果 CAS 失败,会一直进行尝试。如果 CAS 长时间一直不成功,可能会给 CPU 带来很大的开销。

 

只能保证一个变量的原子操作:

当对一个变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作,但是对多个变量操作时,CAS 目前无法直接保证操作的原子性。但是我们可以通过以下两种办法来解决:1)使用互斥锁来保证原子性;2)将多个变量封装成对象,通过 AtomicReference 来保证原子性。

 

什么是ABA问题?ABA问题怎么解决?

CAS 的使用流程通常如下:1)首先从地址 V 读取值 A;2)根据 A 计算目标值 B;3)通过 CAS 以原子的方式将地址 V 中的值从 A 修改为 B。

但是在第1步中读取的值是A,并且在第3步修改成功了,我们就能说它的值在第1步和第3步之间没有被其他线程改变过了吗?

如果在这段期间它的值曾经被改成了B,后来又被改回为A,那CAS操作就会误认为它从来没有被改变过。这个漏洞称为CAS操作的“ABA”问题。Java并发包为了解决这个问题,提供了一个带有标记的原子引用类“AtomicStampedReference”,它可以通过控制变量值的版本来保证CAS的正确性。因此,在使用CAS前要考虑清楚“ABA”问题是否会影响程序并发的正确性,如果需要解决ABA问题,改用传统的互斥同步可能会比原子类更高效。


@Getter@Setter@AllArgsConstructor@ToString
class User{
    private String name;
    private int age;
}
public class AtomicReferenceDemo {
    public static void main(String[] args) {
        User zs = new User("zs", 22);
        User ls = new User("ls", 22);
        AtomicReference<User> userAtomicReference = new AtomicReference<>();
        userAtomicReference.set(zs);
        System.out.println(userAtomicReference.compareAndSet(zs, ls)+"t"+userAtomicReference.get().toString());
        System.out.println(userAtomicReference.compareAndSet(zs, ls)+"t"+userAtomicReference.get().toString());
    }
}
 


public class ABADemo {
    private static AtomicReference<Integer> atomicReference=new AtomicReference<>(100);
    private static AtomicStampedReference<Integer> stampedReference=new AtomicStampedReference<>(100,1);
    public static void main(String[] args) {
        System.out.println("===以下是ABA问题的产生===");
        new Thread(()->{
            atomicReference.compareAndSet(100,101);
            atomicReference.compareAndSet(101,100);
        },"t1").start();

        new Thread(()->{
            //先暂停1秒 保证完成ABA
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            System.out.println(atomicReference.compareAndSet(100, 2019)+"t"+atomicReference.get());
        },"t2").start();
        try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
        System.out.println("===以下是ABA问题的解决===");

        new Thread(()->{
            int stamp = stampedReference.getStamp();
            System.out.println(Thread.currentThread().getName()+"t 第1次版本号"+stamp+"t值是"+stampedReference.getReference());
            //暂停1秒钟t3线程
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }

            stampedReference.compareAndSet(100,101,stampedReference.getStamp(),stampedReference.getStamp()+1);
            System.out.println(Thread.currentThread().getName()+"t 第2次版本号"+stampedReference.getStamp()+"t值是"+stampedReference.getReference());
            stampedReference.compareAndSet(101,100,stampedReference.getStamp(),stampedReference.getStamp()+1);
            System.out.println(Thread.currentThread().getName()+"t 第3次版本号"+stampedReference.getStamp()+"t值是"+stampedReference.getReference());
        },"t3").start();

        new Thread(()->{
            int stamp = stampedReference.getStamp();
            System.out.println(Thread.currentThread().getName()+"t 第1次版本号"+stamp+"t值是"+stampedReference.getReference());
            //保证线程3完成1次ABA
            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
            boolean result = stampedReference.compareAndSet(100, 2019, stamp, stamp + 1);
            System.out.println(Thread.currentThread().getName()+"t 修改成功否"+result+"t最新版本号"+stampedReference.getStamp());
            System.out.println("最新的值t"+stampedReference.getReference());
        },"t4").start();
    }
 

 private static AtomicReference<Integer> atomicReference=new AtomicReference<>(100);
    private static AtomicStampedReference<Integer> stampedReference=new AtomicStampedReference<>(100,1);
    public static void main(String[] args) {
        System.out.println("===以下是ABA问题的产生===");
        new Thread(() -> {
            atomicReference.compareAndSet(100, 101);
            atomicReference.compareAndSet(101, 100);
        }, "t1").start();

        new Thread(() -> {
            //先暂停1秒 保证完成ABA
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(atomicReference.compareAndSet(100, 2019) + "t" + atomicReference.get());
        }, "t2").start();
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("===以下是ABA问题的解决===");

        new Thread(() -> {
            int stamp = stampedReference.getStamp();
            System.out.println(Thread.currentThread().getName() + "t 第1次版本号" + stamp + "t值是" + stampedReference.getReference());
            //暂停1秒钟t3线程
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            stampedReference.compareAndSet(100, 101, stampedReference.getStamp(), stampedReference.getStamp() + 1);
            System.out.println(Thread.currentThread().getName() + "t 第2次版本号" + stampedReference.getStamp() + "t值是" + stampedReference.getReference());
            stampedReference.compareAndSet(101, 100, stampedReference.getStamp(), stampedReference.getStamp() + 1);
            System.out.println(Thread.currentThread().getName() + "t 第3次版本号" + stampedReference.getStamp() + "t值是" + stampedReference.getReference());
        }, "t3").start();

        new Thread(() -> {
            int stamp = stampedReference.getStamp();
            System.out.println(Thread.currentThread().getName() + "t 第1次版本号" + stamp + "t值是" + stampedReference.getReference());
            //保证线程3完成1次ABA
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("====================="+stampedReference.getStamp());
            boolean result = stampedReference.compareAndSet(100, 2019, stamp, stamp + 1);
            System.out.println(Thread.currentThread().getName() + "t 修改成功否" + result + "t最新版本号" + stampedReference.getStamp());
            System.out.println("最新的值t" + stampedReference.getReference());
        }, "t4").start();
    }

但是有时候,并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference,
boolean success = ref.compareAndSet(prev, next, true, false); 

五、Java 8对CAS机制的优化

 

但是这个CAS有没有问题呢?肯定是有的。比如说大量的线程同时并发修改一个AtomicInteger,可能有很多线程会不停的自旋,进入一个无限重复的循环中。

 

这些线程不停地获取值,然后发起CAS操作,但是发现这个值被别人改过了,于是再次进入下一个循环,获取值,发起CAS操作又失败了,再次进入下一个循环。

 

在大量线程高并发更新AtomicInteger的时候,这种问题可能会比较明显,导致大量线程空循环,自旋转,性能和效率都不是特别好。

 

于是,当当当当,Java 8推出了一个新的类,LongAdder,他就是尝试使用分段CAS以及自动分段迁移的方式来大幅度提升多线程高并发执行CAS操作的性能!

 

 

在LongAdder的底层实现中,首先有一个base值,刚开始多线程来不停的累加数值,都是对base进行累加的,比如刚开始累加成了base = 5。

 

接着如果发现并发更新的线程数量过多,就会开始施行分段CAS的机制,也就是内部会搞一个Cell数组,每个数组是一个数值分段。

 

这时,让大量的线程分别去对不同Cell内部的value值进行CAS累加操作,这样就把CAS计算压力分散到了不同的Cell分段数值中了!

 

这样就可以大幅度的降低多线程并发更新同一个数值时出现的无限循环的问题,大幅度提升了多线程并发更新数值的性能和效率!

 

而且他内部实现了自动分段迁移的机制,也就是如果某个Cell的value执行CAS失败了,那么就会自动去找另外一个Cell分段内的value值进行CAS操作。

 

这样也解决了线程空旋转、自旋不停等待执行CAS操作的问题,让一个线程过来执行CAS时可以尽快的完成这个操作。

 

最后,如果你要从LongAdder中获取当前累加的总值,就会把base值和所有Cell分段数值加起来返回给你。

 

 

六、总结 & 思考

 

不知道大家有没有发现这种高并发访问下的分段处理机制,在很多地方都有类似的思想体现!因为高并发中的分段处理机制实际上是一个很常见和常用的并发优化手段。

 

 

最后

以上就是现实饼干为你收集整理的JUC之 atomic——核心CAS 原子操作atomicCAS的缺点:的全部内容,希望文章能够帮你解决JUC之 atomic——核心CAS 原子操作atomicCAS的缺点:所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(64)

评论列表共有 0 条评论

立即
投稿
返回
顶部