我是靠谱客的博主 喜悦板栗,这篇文章主要介绍Java笔记:CountDownLatch - 计数锁存器、Future、CyclicBarrier - 循环屏障 和 Semaphore - 信号量... ,现在分享给大家,希望可以做个参考。


1.CountDownLatch -- 锁存器

有时在线程开发中遇到一些问题,主线程启动了多个子线程,主线程需要在子线程都结束后再做一些处理,也就是说,主线程必须知道所有子线程都结束的时候。刚开始的时候自己写一个子线程列表,启动一个子线程,加1,结束一个子线程,减1,主线程不断循环等待,当子线程列表归零时就说明所有子线程都结束了。简单的任务还可以勉强使用,但大量是用线程池的时候,发现不靠谱了,研究发现,原来jdk中已经有了该工具类--CountDownLatch


jdk文档:

一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待

用给定的计数初始化CountDownLatch。由于调用了countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用CyclicBarrier

CountDownLatch 是一个通用同步工具,它有很多用途。将计数 1 初始化的CountDownLatch 用作一个简单的开/关锁存器,或入口:在通过调用countDown() 的线程打开入口前,所有调用await 的线程都一直在入口处等待。用N 初始化的 CountDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。

CountDownLatch 的一个有用特性是,它不要求调用 countDown 方法的线程等到计数到达零时才继续,而在所有线程都能通过之前,它只是阻止任何线程继续通过一个await


构造方法摘要
CountDownLatch(intcount)构造一个用给定计数初始化的CountDownLatch
方法摘要
void await() 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
boolean await(longtimeout,TimeUnitunit)
使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。
void countDown()递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
long getCount()返回当前计数。
String toString()返回标识此锁存器及其状态的字符串。


实例:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Test { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(3); //三个个工人的协作 Worker worker1 = new Worker("张三", 4000, latch); Worker worker2 = new Worker("李四", 2000, latch); Worker worker3 = new Worker("王五", 5000, latch); worker1.start(); worker2.start(); worker3.start(); // 主线程阻塞,等待所有子线程完成(调用latch.countDown()) latch.await(); System.out.println("主线程:工作完成"); } } /** * 工人类-子线程 */ class Worker extends Thread{ private String name; // 工人姓名 private long time; // 工作时间(单位:毫秒) private CountDownLatch latch; // 计数锁存器 public Worker(String name, long time, CountDownLatch latch) { this.name = name; this.time = time; this.latch = latch; } private void doWork() throws InterruptedException { Thread.sleep(time); } public void run() { try { doWork(); // 工作中。。。 System.out.println("工人: " + name + " 完成工作"); } catch (InterruptedException e) { System.out.println("工人: " + name + " 工作出现意外"); } finally { latch.countDown(); //工人完成工作,计数器减一 } } }

运行结果:



2.Future

但有时发现CountDownLatch只知道子线程的完成情况是不够的,如果在子线程完成后获取其计算的结果,那CountDownLatch就有些捉襟见衬了,所以jdk提供的Future类,不仅可以在子线程完成后收集其结果,还可以设定子线程的超时时间,避免主任务一直等待。


方法摘要
boolean cancel(booleanmayInterruptIfRunning) 试图取消对此任务的执行。
V get()如有必要,等待计算完成,然后获取其结果。
V get(longtimeout,TimeUnitunit)如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
boolean isCancelled()如果在任务正常完成前将其取消,则返回true
boolean isDone()
如果任务已完成,则返回 true


实例:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class Test { private static final ExecutorService executor = Executors.newCachedThreadPool(); private static Random random = new Random(); private static long timeout = 4L; /** * 启动多个子任务 */ public static void startMoreTask() { List<Callable<Integer>> subTasks = new ArrayList<Callable<Integer>>(); // 子任务集合 List<Integer> subTaskResult = new ArrayList<Integer>(); // 子任务的返回集合 // 1.初始化10个子任务 for (int i = 1; i <= 10; i ++) { SubTask subTask = new SubTask("子线程-" + i, random.nextInt(10)); // 子线程随机在10秒内完成 subTasks.add(subTask); } // 2.执行所有的子任务 try { List<Future<Integer>> futures = executor.invokeAll(subTasks); for (Future<Integer> future : futures) { try { Integer result = future.get(timeout, TimeUnit.SECONDS); // 设置每个子任务的执行时间不得超过4秒 subTaskResult.add(result); } catch (ExecutionException | TimeoutException e) { future.cancel(true); // 当出现执行异常和超时异常时,终止该子任务 } } } catch (InterruptedException e) { System.out.println("任务执行异常:" + e.getMessage()); } } public static void main(String[] args) { // subTask1测试超时的情况 SubTask subTask1 = new SubTask("子线程 - 1", 10); Future<Integer> future1 = executor.submit(subTask1); Integer result1 = null; try { result1 = future1.get(5, TimeUnit.SECONDS); // 设置子任务的执行时间不得超过5秒 } catch (InterruptedException e) { System.out.println("线程中断出错"); future1.cancel(true);// 中断执行此任务的线程 } catch (ExecutionException e) { System.out.println("线程服务出错"); future1.cancel(true);// 中断执行此任务的线程 } catch (TimeoutException e) {// 超时异常 System.out.println("线程执行超时"); future1.cancel(true);// 中断执行此任务的线程 } System.out.println("subTask1运行结果:" + (result1 == null ? "null" : result1)); // subTask2测试拿到子线程返回结果的情况 SubTask subTask2 = new SubTask("子线程 - 2", 5); Future<Integer> future2 = executor.submit(subTask2); Integer result2 = null; try { result2 = future2.get(10, TimeUnit.SECONDS); // 设置子任务的执行时间不得超过10秒 } catch (InterruptedException e) { System.out.println("线程中断出错"); future2.cancel(true);// 中断执行此任务的线程 } catch (ExecutionException e) { System.out.println("线程服务出错"); future2.cancel(true);// 中断执行此任务的线程 } catch (TimeoutException e) {// 超时异常 System.out.println("线程执行超时"); future2.cancel(true);// 中断执行此任务的线程 } System.out.println("subTask2运行结果:" + (result2 == null ? "null" : result2)); } } class SubTask implements Callable<Integer> { private String name; // 子线程名 private int second; // 子线程完成需要的时间(秒) public SubTask (String name, int second) { this.name = name; this.second = second; } @Override public Integer call() throws Exception { System.out.println("#子线程-" + name + " 开始"); Thread.sleep(second * 1000L); System.out.println("#子线程-" + name + " 结束,耗时秒数: " + second); return second; } }


运行结果:


3.CyclicBarrier -- 循环屏障

之后又发现一个非常好用的多线程辅助类--CyclicBarrier,和CountDownLatch类似,不过适用场景不同。

jdk文档:

一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。


构造方法摘要
CyclicBarrier(intparties)
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
CyclicBarrier(intparties,RunnablebarrierAction)
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。
方法摘要
int await()在所有参与者都已经在此 barrier 上调用await 方法之前,将一直等待。
int await(longtimeout,TimeUnitunit)在所有参与者都已经在此屏障上调用await 方法之前将一直等待,或者超出了指定的等待时间。
int getNumberWaiting()返回当前在屏障处等待的参与者数目。
int getParties()
返回要求启动此 barrier 的参与者数目。
boolean isBroken() 查询此屏障是否处于损坏状态。
void reset()将屏障重置为其初始状态。


实例:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.util.Random; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; public class CyclicBarrierTest { private static ExecutorService executor = Executors.newFixedThreadPool(10, new ThreadFactory() { // 创建固定大小的守护线程池 @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }); public static void main(String[] args) throws Exception { // 初始化CyclicBarrier //CyclicBarrier barrier = new CyclicBarrier(10); // 1.可以只传入一个参数:给定数量的参与者(线程) CyclicBarrier barrier = new CyclicBarrier(10, new Runnable() { // 2.也可以传入第二个参数,当所有的子线程到达某个公共屏障点,进入barrier线程 @Override public void run() { System.out.println("#所有任务初始化完成"); } }); // 使用线程池 for (int i = 1; i <= barrier.getParties(); i ++) { // getParties() 返回要求启动此 barrier 的参与者数目 String name = "子任务-" + i; executor.submit(new Task(name, barrier)); } // 启动所有子线程 executor.shutdown(); Thread.sleep(Integer.MAX_VALUE); } } /** * 子任务:初始化、执行 */ class Task implements Runnable { private String name; private CyclicBarrier barrier; public Task (String name, CyclicBarrier barrier) { this.name = name; this.barrier = barrier; } @Override public void run() { try { // 子任务初始化 int sleepTime = new Random().nextInt(10); Thread.sleep(sleepTime * 1000L); System.out.println("子任务:" + this.name + " 初始化完成,使用" + sleepTime + "秒,此时已经初始化完成的子任务数量:" + barrier.getNumberWaiting()); // getNumberWaiting() 返回当前在屏障处等待的参与者数目 // 在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间 barrier.await(); // 也可以换为await(long timeout, TimeUnit unit) 方法设定超时时间 // 子任务执行 System.out.println("子任务:" + this.name + " 执行完毕"); } catch (Exception e) {} } }

运行结果:



这里需要解释一下屏障点,屏障点就是某种状态,上面程序的屏障点就是所有子线程都调用了await()方法。

JDK文档中await()方法的屏障点还有其它情况:

在所有参与者都已经在此 barrier 上调用await 方法之前,将一直等待。

如果当前线程不是将到达的最后一个线程,出于调度目的,将禁用它,且在发生以下情况之一前,该线程将一直处于休眠状态:

  • 最后一个线程到达;或者
  • 其他某个线程中断当前线程;或者
  • 其他某个线程中断另一个等待线程;或者
  • 其他某个线程在等待 barrier 时超时;或者
  • 其他某个线程在此 barrier 上调用 reset()

如果当前线程:

  • 在进入此方法时已经设置了该线程的中断状态;或者
  • 在等待时被中断
则抛出 InterruptedException,并且清除当前线程的已中断状态。

如果在线程处于等待状态时 barrier 被 reset(),或者在调用 await 时 barrier 被损坏,抑或任意一个线程正处于等待状态,则抛出 BrokenBarrierException 异常。

如果任何线程在等待时被 中断,则其他所有等待线程都将抛出BrokenBarrierException 异常,并将 barrier 置于损坏状态。

如果当前线程是最后一个将要到达的线程,并且构造方法中提供了一个非空的屏障操作,则在允许其他线程继续运行之前,当前线程将运行该操作。如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,并将 barrier 置于损坏状态。

返回:
到达的当前线程的索引,其中,索引 getParties() - 1 指示将到达的第一个线程,零指示最后一个到达的线程
抛出:
InterruptedException - 如果当前线程在等待时被中断
BrokenBarrierException - 如果另一个 线程在当前线程等待时被中断或超时,或者重置了 barrier,或者在调用await 时 barrier 被损坏,抑或由于异常而导致屏障操作(如果存在)失败。


4.Semaphore -- 信号量
Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。它可以控同时访问的线程个数,通过acquire() 获取一个许可,如果没有就等待,而release() 释放一个许可。
Semaphore类提供了2个构造器:
复制代码
1
2
3
4
5
6
7
public Semaphore(int permits) { //参数permits表示许可数目,即同时可以允许多少线程进行访问 sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { //这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可 sync = (fair)? new FairSync(permits) : new NonfairSync(permits);

Semaphore类中比较重要的几个方法,首先是acquire()、release()方法:
复制代码
1
2
3
4
5
6
7
8
9
public void acquire() throws InterruptedException { } //获取一个许可 public void acquire(int permits) throws InterruptedException { } //获取permits个许可 public void release() { } //释放一个许可 public void release(int permits) { } //释放permits个许可
acquire()用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。
release()用来释放许可。注意,在释放许可之前,必须先获获得许可。

这4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法:
复制代码
1
2
3
4
5
6
public boolean tryAcquire() { }; //尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false public boolean tryAcquire(int permits) { }; //尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false

实例:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; public class SemaphoreTest { private static ExecutorService executor = Executors.newFixedThreadPool(10, new ThreadFactory() { // 创建固定大小的守护线程池 @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }); public static void main(String[] args) throws Exception { // 假定有10个工人,5个机器,1个功能需要1个机器才能工作 int workerNum = 10; int machineNum = 5; // 工人占用机器期间其它工人无法使用该机器,直到该机器被释放 Semaphore semaphore = new Semaphore(machineNum); // 开始工作 for (int i = 1; i <= workerNum; i ++) { String name = "工人-" + i; executor.submit(new Worker(name, semaphore)); } // 启动所有子线程 executor.shutdown(); Thread.sleep(Integer.MAX_VALUE); } } /** * 工人类,一个工人需要一个机器才能工作 */ class Worker implements Runnable { private String name; private Semaphore semaphore; public Worker(String name, Semaphore semaphore) { this.name = name; this.semaphore = semaphore; } @Override public void run() { try { semaphore.acquire(); // 阻塞,等待一个许可 System.out.println("工人:" + this.name + " 占用机器生产ing"); int sleepTime = new Random().nextInt(10); Thread.sleep(sleepTime * 1000L); System.out.println("工人:" + this.name + " 占用机器生产 " + sleepTime + " 秒后释放机器"); semaphore.release(); } catch (Exception e) { } } }

运行结果:



总结,CountDownLatch、CyclicBarrier和Semaphore的区别和使用场景

1.CountDownLatch和CyclicBarrier都能够实现线程之间的等待,只不过它们侧重点不同:
(1)CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它才执行;
(2)而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;
(3)CountDownLatch是不能够重用的,而CyclicBarrier是可以重用的。
2.Semaphore其实和有点类似,它一般用于控制对某组资源的访问权限



参考:

jdk文档

java 多线程 CountDownLatch用法

Java并发编程:Callable、Future和FutureTask

java Future 接口介绍

Java并发编程:CountDownLatch、CyclicBarrier和Semaphore

Java并发之CountDownLatch、CyclicBarrier和Semaphore

CyclicBarrier介绍

Java之CyclicBarrier使用



最后

以上就是喜悦板栗最近收集整理的关于Java笔记:CountDownLatch - 计数锁存器、Future、CyclicBarrier - 循环屏障 和 Semaphore - 信号量... 的全部内容,更多相关Java笔记:CountDownLatch内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(68)

评论列表共有 0 条评论

立即
投稿
返回
顶部