我是靠谱客的博主 稳重冥王星,最近开发中收集的这篇文章主要介绍Java并发编程常用API解析,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

在 Java 5.0 提供了 java.util.concurrent (简称JUC )包,在此包中增加了在并发编程中很常用的实用工具类,用于定义类似于线程的自定义子系统,包括线程池、异步 IO 和轻量级任务框架。提供可调的、灵活的线程池。还提供了设计用于多线程上下文中的 Collection 实现等。

volatile

内存可见性

  • 内存可见性(Memory Visibility)是指当某个线程正在使用对象状态而另一个线程在同时修改该状态,需要确保当一个线程修改了对象状态后,其他线程能够看到发生的状态变化。
  • 可见性错误是指当读操作与写操作在不同的线程中执行时,我们无法确保执行读操作的线程能适时地看到其他线程写入的值,有时甚至是根本不可能的事情。
  • 我们可以通过同步来保证对象被安全地发布。除此之外我们也可以使用一种更加轻量级的 volatile 变量。

volatile关键字

Java 提供了一种稍弱的同步机制,即 volatile 变量,用来确保将变量的更新操作通知到其他线程。
可以将 volatile 看做一个轻量级的锁,但是又与锁有些不同:
- 对于多线程,不是一种互斥关系
- 不能保证变量状态的“原子性操作”

        /**
         * volatile关键字:当多个线程进行操作共享数据时,可以保证内存中数据可见.
         * 相较于synchronized是一种较为轻量级的同步策略.
         * volatile特点:
         * 1.volatile不具备互斥性
         * 2.volatile不能保证变量的"原子性"
         */
        public class TestVolatile {

            public static void main(String[] args) {
                ThreadDemo threadDemo = new ThreadDemo();
                //启动线程
                new Thread(threadDemo).start();

                while (true) {
        //          synchronized (threadDemo) {
                        if (threadDemo.isFlag()) {
                            System.out.println("main======");
                            break;
                        }
        //          }
                }
            }

        }

        class ThreadDemo extends Thread{
            //标记
            private volatile boolean flag = false;

            public void run(){
                try {
                    //线程睡眠
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //修改标记
                flag = true;
                System.out.println(getName() + ": flag= " + isFlag());
            }

            public boolean isFlag() {
                return flag;
            }

            public void setFlag(boolean flag) {
                this.flag = flag;
            }

        }

CAS算法

CAS 算法

CAS (Compare-And-Swap) 是一种硬件对并发的支持,针对多处理器操作而设计的处理器中的一种特殊指令,用于管理对共享数据的并发访问。
- CAS 是一种无锁的非阻塞算法的实现。
- CAS 包含了 3 个操作数:
- 需要读写的内存值 V
- 进行比较的值 A
- 拟写入的新值 B
- 当且仅当 V 的值等于 A 时,CAS 通过原子方式用新值 B 来更新 V 的值,否则不会执行任何操作。

原子变量

类的小工具包,支持在单个变量上解除锁的线程安全编程。事实上,此包中的类可将 volatile 值、字段和数组元素的概念扩展到那些也提供原子条件更新操作的类。
- 类 AtomicBoolean、AtomicInteger、AtomicLong 和 AtomicReference 的实例各自提供对
相应类型单个变量的访问和更新。每个类也为该类型提供适当的实用工具方法。
- AtomicIntegerArray、AtomicLongArray 和 AtomicReferenceArray 类进一步扩展了原子操作,对这些类型的数组提供了支持。这些类在为其数组元素提供 volatile 访问语义方面也引人注目,这对于普通数组来说是不受支持的。
- 核心方法:boolean compareAndSet(expectedValue, updateValue)
- java.util.concurrent.atomic 包下提供了一些原子操作的常用类:
- AtomicBoolean 、AtomicInteger 、AtomicLong 、 AtomicReference
- AtomicIntegerArray 、AtomicLongArray
- AtomicMarkableReference
- AtomicReferenceArray
- AtomicStampedReference

    /**
         * i++ 原子性问题:
         * i++ 操作分为三步骤:"读-改-写"
         *  int temp = i;
         *  i = i + 1;
         *  i = temp;
         * 
         * 原子变量:jdk1.5后java.util.concurrent.atomic 包下提供了常用的原子变量:
         *  1.volatile 保证内存可见性
         *  2.CAS(Compare-And-Swap)算法保证数据的原子性,
         *      CAS算法是硬件对于并发操作共享数据的支持
         *      CAS包含了三个操作数:
         *          内存值 V
         *          预估值 A
         *          更新值 B
         *      当且仅当 V == A 时,V = B,否则,将不进行任何操作
         */
        public class TestAtomicDemo {

            public static void main(String[] args) {
                AtomicDemo atomicDemo = new AtomicDemo();
                //多个线程执行
                for (int i = 0; i < 10; i++) {
                    new Thread(atomicDemo).start();
                }
            }
        }

        class AtomicDemo implements Runnable {

        //  private int num = 0;
            private AtomicInteger num = new AtomicInteger();

            @Override
            public void run() {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " :" + getNum());
            }

            public int getNum() {
                return num.getAndIncrement();
            }

        }

ConcurrentHashMap 锁分段机制

  • Java 5.0 在 java.util.concurrent 包中提供了多种并发容器类来改进同步容器的性能。
  • ConcurrentHashMap 同步容器类是Java 5 增加的一个线程安全的哈希表。对与多线程的操作,介于 HashMap 与 Hashtable 之间。内部采用“锁分段”机制替代 Hashtable 的独占锁。进而提高性能。
  • 此包还提供了设计用于多线程上下文中的 Collection 实现:
    ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、CopyOnWriteArrayList 和 CopyOnWriteArraySet。当期望许多线程访问一个给定 collection 时,ConcurrentHashMap 通常优于同步的 HashMap,ConcurrentSkipListMap 通常优于同步的 TreeMap。当期望的读数和遍历远远大于列表的更新数时,CopyOnWriteArrayList 优于同步的 ArrayList。
        /**
         * CopyOnWriteArrayList/CopyOnWriteArraySet "写入复制"
         * 注意:添加操作多时,效率低,因为每次添加时都会进行复制,开销非常大.
         */
        public class TestCopyOnWriterArrayList {

            public static void main(String[] args) {
                CurrentThread currentThread = new CurrentThread();

                for (int i = 0; i < 10; i++) {
                    new Thread(currentThread).start();
                }
            }
        }

        class CurrentThread implements Runnable{

        //  private static List<String> list = Collections.synchronizedList(new ArrayList<String>());

            private static CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<String>(); 

            static{
                list.add("a");
                list.add("b");
                list.add("c");
            }

            @Override
            public void run() {
                Iterator<String> iterator = list.iterator();

                while (iterator.hasNext()) {
                    System.out.println(iterator.next());
                    list.add("m");
                }
            }

        }

CountDownLatch 闭锁

  • Java 5.0 在 java.util.concurrent 包中提供了多种并发容器类来改进同步容器的性能。
  • CountDownLatch 一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
  • 闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活动直到其他活动都完成才继续执行:
    • 确保某个计算在其需要的所有资源都被初始化之后才继续执行;
    • 确保某个服务在其依赖的所有其他服务都已经启动之后才启动;
    • 等待直到某个操作所有参与者都准备就绪再继续执行。
        /**
         *  CountDownLatch,闭锁:在完成某些运算时,只有其他所有线程的运算全部完成,当前运算才能继续执行
         */
        public class TestCountDownLatch {

            public static void main(String[] args) {
                CountDownLatch latch = new CountDownLatch(6);
                LatchDemo latchDemo = new LatchDemo(latch);

                long start = System.currentTimeMillis();

                for (int i = 0; i < 6; i++) {
                    new Thread(latchDemo).start();
                }

                try {
                    //等待
                    latch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                long end = System.currentTimeMillis();

                System.out.println("消费时间为: " + (end - start));
            }
        }

        class LatchDemo implements Runnable{

            private CountDownLatch latch ;

            public LatchDemo(CountDownLatch latch) {
                this.latch = latch;
            }

            @Override
            public void run() {
                synchronized (this) {
                    try {
                        for (int i = 0; i < 10000; i++) {
                            if (i%2 == 1) {
                                System.out.println(i);
                            }
                        }
                    } finally {
                        //减1
                        latch.countDown();
                    }
                }

            }

        }

实现 Callable 接口创建线程

  • Java 5.0 在 java.util.concurrent 提供了一个新的创建执行线程的方式:Callable 接口
  • Callable 接口类似于 Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是 Runnable 不会返回结果,并且无法抛出经过检查的异常。
  • Callable 需要依赖FutureTask ,FutureTask 也可以用作闭锁。
        /**
         * 实现Callable接口创建执行线程,相较于实现Runnable接口的方式,方法有返回值,并且可以抛出异常
         * 执行Callable方式,需要FutureTask实现类的支持,用于接收运算结果.FutureTask是Future的实现类.
         */
        public class TestCallable {

            public static void main(String[] args) {
                CallableDemo cd = new CallableDemo();
                //创建FutureTask,用于接收结果
                FutureTask<Integer> result = new FutureTask<>(cd);
                //执行线程
                new Thread(result).start();
                try {
                    //获取返回值,FutureTask可用于闭锁
                    Integer sum = result.get();
                    System.out.println("从0到100的总和是:"+sum);
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
        }

        class CallableDemo implements Callable<Integer>{

            @Override
            public Integer call() throws Exception {
                int sum = 0;
                for (int i = 0; i <= 100; i++) {
                    System.out.println(i);
                    sum += i;
                }
                return sum;
            }

        }

Lock 同步锁

  • 在 Java 5.0 之前,协调共享对象的访问时可以使用的机制只有 synchronized 和 volatile 。Java 5.0 后增加了一些新的机制,但并不是一种替代内置锁的方法,而是当内置锁不适用时,作为一种可选择的高级功能。
  • ReentrantLock 实现了 Lock 接口,并提供了与synchronized 相同的互斥性和内存可见性。但相较于synchronized 提供了更高的处理锁的灵活性。
        /**
         * 用于解决多线程问题方式:
         * 1.同步代码块
         * 2.同步方式
         * 3.同步锁(jdk1.5之后)
         * Lock是一个显示锁,需要通过lock()方法上锁,必须通过unlock()方法进行释放锁.
         */
        public class TestLock {

            public static void main(String[] args) {
                Ticket ticket = new Ticket();

                //三个窗口同时卖票
                new Thread(ticket,"1号窗口").start();;
                new Thread(ticket,"2号窗口").start();;
                new Thread(ticket,"3号窗口").start();;
            }
        }

        class Ticket implements Runnable{

            private int ticket = 100;
            //lock锁
            private Lock lock = new ReentrantLock();

            @Override
            public void run() {
                //上锁
                lock.lock();
                try {
                    while (true) {
                        if (ticket > 0) {
                            Thread.sleep(10);
                            System.out.println(Thread.currentThread().getName() + "正在卖票,还剩" + --ticket + "张票");
                        } 
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally{
                    //释放锁
                    lock.unlock();
                }
            }

        }

Condition 控制线程通信

  • Condition 接口描述了可能会与锁有关联的条件变量。这些变量在用法上与使用 Object.wait 访问的隐式监视器类似,但提供了更强大的功能。需要特别指出的是,单个 Lock 可能与多个 Condition 对象关联。为了避免兼容性问题,Condition 方法的名称与对应的 Object 版本中的不同。
  • 在 Condition 对象中,与 wait、notify 和 notifyAll 方法对应的分别是await、signal 和 signalAll。
  • Condition 实例实质上被绑定到一个锁上。要为特定 Lock 实例获得Condition 实例,请使用其 newCondition() 方法。
        /**
         * 生产者和消费者
         * 等待唤醒机制 
         * 虚假唤醒问题
         * 使用Lock和Condition 
         */
        public class TestProductAndCustomerForLock {

            public static void main(String[] args) {
                Clerk clerk = new Clerk();

                Product product = new Product(clerk);
                Consumer consumer = new Consumer(clerk);

                new Thread(product,"生产者A").start();
                new Thread(consumer,"消费者B").start();;
            }
        }

        /**
         * 店员
         */
        class Clerk {
            private int product = 0;
            //新建锁
            private Lock lock = new ReentrantLock();
            //新建condition
            private Condition condition = lock.newCondition();

            //进货
            public void get(){
                //上锁
                lock.lock();
                try {
                    while (product >= 10) { //为了避免虚假唤醒问题,应该总是使用在循环中
                        System.out.println("产品已满!");
                        try {
                            //等待
                            condition.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println(Thread.currentThread().getName() + ":" + ++product);
                    //唤醒
                    condition.signalAll();
                } finally{
                    //释放锁
                    lock.unlock();
                }

            }

            //卖货
            public void sell(){
                //上锁
                lock.lock();

                try {
                    while (product <= 0) {//为了避免虚假唤醒问题,应该总是使用在循环中
                        System.out.println("缺货!");
                        try {
                            //等待
                            condition.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println(Thread.currentThread().getName() + ":"+ --product);
                    //唤醒
                    condition.signalAll();
                } finally{
                    //释放锁
                    lock.unlock();
                }
            }

        }

        /**
         * 生产者
         */
        class Product implements Runnable{

            private Clerk clerk;

            public Product(Clerk clerk) {
                this.clerk = clerk;
            }

            @Override
            public void run() {
                for (int i = 0; i < 20; i++) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //店员进货
                    clerk.get();
                }
            }

        }

        /**
         * 消费者
         */
        class Consumer implements Runnable{

            private Clerk clerk;

            public Consumer(Clerk clerk) {
                this.clerk = clerk;
            }

            @Override
            public void run() {
                for (int i = 0; i < 20; i++) {
                    //店员卖货
                    clerk.sell();
                }
            }

        }

ReadWriteLock 读写锁

  • ReadWriteLock 维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。只要没有 writer,读取锁可以由多个 reader 线程同时保持。写入锁是独占的。。
  • ReadWriteLock 读取操作通常不会改变共享资源,但执行写入操作时,必须独占方式来获取锁。对于读取操作占多数的数据结构。 ReadWriteLock 能提供比独占锁更高的并发性。而对于只读的数据结构,其中包含的不变性可以完全不需要考虑加锁操作。
        /**
         * ReadWriteLock:读写锁
         * 写写/读写 需要"互斥"
         * 读读 不需要"互斥"
         */
        public class TestReadWriteLock {

            public static void main(String[] args) {
                final ReadWriteLockDemo rwld = new ReadWriteLockDemo();

                //写
                new Thread(new Runnable() {

                    @Override
                    public void run() {
                        rwld.write((int)Math.random()*101);
                    }
                },"write : ").start();

                //读
                for (int i = 0; i < 100; i++) {
                    new Thread(new Runnable() {

                        @Override
                        public void run() {
                            rwld.read();
                        }
                    },"read: ").start();;
                }
            }
        }

        class ReadWriteLockDemo {
            private int num = 0;
            //创建读写锁
            private ReadWriteLock lock = new ReentrantReadWriteLock();
            //读
            public void read(){
                //上锁
                lock.readLock().lock();
                try {
                    System.out.println(Thread.currentThread().getName()+":"+num);
                } finally{
                    //释放锁
                    lock.readLock().unlock();
                }
            }

            //写
            public void write(int num){
                //上锁
                lock.writeLock().lock();
                try {
                    System.out.println(Thread.currentThread().getName() + ":" + num);
                    this.num = num;
                } finally{
                    //释放锁
                    lock.writeLock().unlock();
                }
            }
        }

线程池Executor

  • 第四种获取线程的方法:线程池,一个 ExecutorService,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置。
  • 线程池可以解决两个不同问题:由于减少了每个任务调用的开销,它们通常可以在执行大量异步任务时提供增强的性能,并且还可以提供绑定和管理资源(包括执行任务集时使用的线程)的方法。每个 ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。
  • 为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子 (hook)。但
    是,强烈建议程序员使用较为方便的 Executors 工厂方法 :
    • Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)
    • Executors.newFixedThreadPool(int)(固定大小线程池)
    • Executors.newSingleThreadExecutor()(单个后台线程)
      它们均为大多数使用场景预定义了设置。
        /**
         * 线程池:提供了一个线程队列,队列中保存着所有等待状态的线程.避免了创建与销毁额外开销,提高响应的速度
         * 线程池体系结构:
         * java.util.concurrent.Executor:负责线程的使用与调度的根接口
         *      |--ExcutorExecutor 子接口:线程池的主要接口
         *          |--ThreadPoolExecutor:实现类
         *          |--ScheduledExecutorService 子接口:负责线程的调度
         *              |--ScheduledThreadPoolExecutor:继承ThreadPoolExecutor,实现ScheduleExecutorService
         * 
         * 工具类:Executors
         * ExecutorService newFixedThreadPool():创建固定大小的连接池
         * ExecutorService newCachedThreadPool():缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量
         * ExecutorService newSingleThreadPoolI):创建单个线程池,线程池中只有一个线程
         * 
         * ScheduledThreadPoolExecutor newScheduledThreadPool():创建固定大小的线程池,可以延迟或者定时完成任务
         */
        public class TestThreadPool {

            public static void main(String[] args) throws Exception {
                //1.创建线程池
                ExecutorService pool = Executors.newFixedThreadPool(6);

                ThreadPoolDemo threadPoolDemo = new ThreadPoolDemo();

                for (int i = 0; i < 5; i++) {
                    //2.为线程池中线程分配任务
                    pool.submit(threadPoolDemo);
                }

                //创建list接收Future
                List<Future<Integer>> list = new ArrayList<>();

                for (int i = 0; i < 5; i++) {
                    //为线程池中线程分配任务
                    Future<Integer> future = pool.submit(new Callable<Integer>() {

                        @Override
                        public Integer call() throws Exception {
                            int sum = 0;
                            for (int j = 0; j <= 100; j++) {
                                sum += j;
                            }
                            return sum;
                        }
                    });

                    list.add(future);
                }

                //3.关闭线程池
                pool.shutdown();

                for (Future<Integer> future : list) {
                    System.out.println(future.get());
                }
            }
        }

        class ThreadPoolDemo implements Runnable{
            private int i = 0;
            @Override
            public void run() {
                while (i < 100) {
                    System.out.println(Thread.currentThread().getName() + ":" + i++);
                }
            }

        }

线程调度 ScheduledExecutorService

一个 ExecutorService,可安排在给定的延迟后运行或定期执行的命令。

        /**
         * 调度线程池
         */
        public class TestScheduledThreadPoolExecutor {

            public static void main(String[] args) throws Exception {
                //创建调度线程池
                ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);

                for (int i = 0; i < 10; i++) {
                    //线程调度
                    ScheduledFuture<Integer> future = pool.schedule(new Callable<Integer>() {

                        @Override
                        public Integer call() throws Exception {
                            int num = new Random().nextInt(100);
                            System.out.println(Thread.currentThread().getName() + ":" + num);
                            return num;
                        }
                    }, 1, TimeUnit.SECONDS);
                    System.out.println(future.get());
                }

                //关闭线程池
                pool.shutdown();
            }
        }

ForkJoinPool 分支/合并框架 工作窃取

Fork/Join 框架:就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行 join 汇总。

Fork/Join 框架与线程池的区别:
- 采用 “工作窃取”模式(work-stealing):
当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。
- 相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的处理方式上.在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。而在fork/join框架实现中,如果某个子问题由于等待另外一个子问题的完成而无法继续运行。那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了线程的等待时间,提高了性能。

        /**
         * ForkJoinPool分支/合并框架
         */
        public class TestForkJoinPool {

            public static void main(String[] args) {
                //jdk1.8中的Instant
                Instant start = Instant.now();

                ForkJoinPool pool = new ForkJoinPool();

                ForkJoinTask<Long> task = new ForkJoinSumCalculate(0L, 100000000L);

                Long sum = pool.invoke(task);
                System.out.println("结果: "+sum);

                Instant end = Instant.now();
                System.out.println("耗时: " + Duration.between(start, end).toMillis() + "秒");
            }

            /**
             * java8 新特性
             */
            @Test
            public void test(){
                Instant start = Instant.now();

                Long sum = LongStream.rangeClosed(0L, 100000000L)
                         .parallel()
                         .reduce(0L, Long::sum);

                System.out.println("结果: "+sum);
                Instant end = Instant.now();
                System.out.println("耗时: " + Duration.between(start, end).toMillis() + "秒");
            }
        }

        class ForkJoinSumCalculate extends RecursiveTask<Long>{

            private static final long serialVersionUID = -2413341168893070665L;

            private long start;
            private long end;
            //设置临界值
            private static final long THURSHOLD = 10000L;

            public ForkJoinSumCalculate(long start, long end) {
                this.start = start;
                this.end = end;
            }

            @Override
            protected Long compute() {
                long length = end -start;
                if (length <= THURSHOLD ) {
                    long sum = 0L;
                    for (long i = start; i <= end; i++) {
                        sum += i;
                    }
                    return sum;
                } else {
                    long middle = (start + end)/2;
                    ForkJoinSumCalculate left = new ForkJoinSumCalculate(start,middle);
                    //进行拆分,同时压入线程队列
                    left.fork();
                    ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle+1, end);
                    //进行拆分,同时压入线程队列
                    right.fork();

                    return left.join() + right.join();
                }
            }

        }

最后

以上就是稳重冥王星为你收集整理的Java并发编程常用API解析的全部内容,希望文章能够帮你解决Java并发编程常用API解析所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(53)

评论列表共有 0 条评论

立即
投稿
返回
顶部