我是靠谱客的博主 彪壮曲奇,这篇文章主要介绍java控制并发流程CountDownLatch、Semaphore、Condition、CyclicBarrier的基础学习,现在分享给大家,希望可以做个参考。

什么是控制并发流程

控制并发流程的工具类,作用就是帮助我们程序员更容易的让线程之间合作

让线程之间相互配合,来满足业务逻辑,比如让线程A等待线程B执行完毕后再执行等合作策略

有那些控制并发流程的工具类

CountDownLatch倒计时门闩

1.CountDownLatch类的作用

① 倒数门闩

② 例子:购物拼团,大巴(游乐园坐过山车排队),人满发车

③ 流程:倒数结束之前,一直处于等待状态,直到倒计时结束了,此线程才继续工作

2.CountDownLatch类的主要方法介绍

① countDownLatch(int cout):仅有一个构造函数,参数count为需要倒数的数值

② await():调用await()方法的线程会被挂起,它会等待直到count值为o才继续执行

③ countDown:将count值减1,直到为0时,等待的线程会被唤起

2.两个典型用法

① 用法一:一个线程等待多个线程都执行完毕,再继续自己的工作

工厂中质检,5个工人检查,所有人都认为通过才通过

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/** * 工厂中质检,5个工人检查,所有人都认为通过才通过 */ public class CountDownLatchDemo1 { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(5); ExecutorService service = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { final int no = i + 1; Runnable runnable = new Runnable() { @Override public void run() { try { Thread.sleep((long) (Math.random() * 10000)); System.out.println("NO." + no + "完成了检查"); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); } } }; service.submit(runnable); } System.out.println("等待5个人检查完......"); latch.await(); System.out.println("所有人都完成了工作,进行下一个环节"); } }

模拟100米跑步,5名选手都准备好了,只等裁判员一声令下,所有人同时开始跑步,当所有人到终点后,跑步结束

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/** * 模拟100米跑步,5名选手都准备好了,只等裁判员一声令下,所有人同时开始跑步,当所有人到终点后,跑步结束 */ public class CountDownLatchDemo2 { public static void main(String[] args) throws InterruptedException { CountDownLatch begin = new CountDownLatch(1); CountDownLatch end = new CountDownLatch(5); ExecutorService service = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { final int no = i + 1; Runnable runnable = new Runnable() { @Override public void run() { System.out.println("No." + no + "准备完毕,等待发令枪"); try { begin.await(); System.out.println("No." + no + "开始跑步了"); Thread.sleep((long) (Math.random()*10000)); System.out.println("No." + no + "跑到终点了"); } catch (InterruptedException e) { e.printStackTrace(); }finally { end.countDown(); } } }; service.submit(runnable); } Thread.sleep(5000); System.out.println("发令枪响,比赛开始!"); begin.countDown(); end.await(); System.out.println("所有人到达终点,比赛结束"); } }

② 扩展用法:多个线程等多个线程完成执行后,再同时执行

③ CountDownLatch是不能够重用的,如果需要重新计数,可以考虑CyclicBarrier或者创建新的CountDownLatch实例

3.CountDownLatch类的总结

① 两个典型用法:一等多和多等一

② CountDownLatch类在创建实例的时候,需要传递倒数次数,倒数次数到0的时候,之前等待的线程会继续进行

③ countDownLatch不能回滚重置

Semaphore信号量

1.Semaphore信号量作用

Semaphore可以用来限制或管理数量有限的资源的使用情况

2.Semaphore信号量使用流程

污染不能太多,污染许可证只能发三张

信号量的作用是维护一个"许可证"的计数,线程可以"获取"许可证,那信号量剩余的许可证就加一,当信号量所拥有的许可证数量为0,那么下一个还想要获取许可证的线程,就需要等待,直到有另外的线程释放了许可证

① 正常情况下的获取许可证

  • 第一个线程来了

  • 第二个线程来了

  • 第二个线程获得许可证

  • 线程3

  • 线程4被挡住了

  • 直到线程1归还许可证

  • 更多线程进来

  • 总结

① 初始化Semaphore并且指定许可证的数量

② 在需要被执行的任务前加acquire()或者acquireUninterruptibly()方法

③ 在任务执行结束后,调用release()来释放许可证

3.Semaphore信号量主要方法

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class SemaphoreDemo { static Semaphore semaphore = new Semaphore(3, true); public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(50); for (int i = 0; i < 100; i++) { service.submit(new Task()); } service.shutdown(); } static class Task implements Runnable { @Override public void run() { try { semaphore.acquire(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "拿到了许可证"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "释放了许可证"); semaphore.release(3); } } }

① new Semaphore(int permits,boolean fair):这里可以设置是否要使用公平策略,如果传入true,那么Semaphore会把之前等待的线程方到FIFO队列里,以便于当有了新的许可证,可以分发给之前等了最长时间的线程

② acquire():从该信号量获取许可证,阻止直到可用,或线程为 interrupted,即响应中断

③ acquireUninterruptibly():从这个信号灯获取许可证,阻止一个可用的,即不响应中断。

④ tryAcquire():看看现在有没有空闲的许可证,如果有的话就获取,如果没有的话也没关系,我不必陷入阻塞,我可以去做别的事情,过一会儿再来查看许可证的空闲情况

⑤ tryAcquire(timeout):和tryAcquire()一样,但是多了一个超时时间,比如"3秒内获取不到别的线程,我就去做别的线程"

3.Semaphore信号量特殊用法

① 一次性获取或释放多个许可证

比如TaskA会调用很消耗资源的method1(),而TaskB调用的是不太消耗资源的method2(),假设我们一共有5个许可证,那么我们可以要求TaskA获取5个许可证才能执行,而TaskB只需要获取到一个许可证就能执行,这样就避免了A和B同时运行的情况,我们可以根据自己的需求合理分配资源

② 注意点:获取和释放的许可证数量必须一致,否则比如每次都获取2个但是只释放1个甚至不释放,随着时间的推移,道最后许可证数量不够用,会导致程序卡死

③ 注意在初始化Semaohore的时候设置公平性,一般设置为true会更合理

④ 并不是必须由许可证的线程释放那个许可证,事实上,获取和释放许可证对线程并无要求,也许是A获取了,然后由B释放了,只要逻辑合理即可

⑤ 信号量的作用,除了控制临界区最多同时有N个线程访问外,另一个作用是可以实现"条件等待",例如线程1需要在线程2完成准备工作后才能开始工作,那么就线程1 acquire(),而线程2完成任务后release(),这样的话,相当于轻量级的CountDownLatch

Condition接口(又称为条件对象)

1.Condition作用

当线程1需要等待某个条件时,它就去condition.await()方法,一旦执行了await()方法,线程就会进入阻塞状态

然后通知会有另外一个线程,假设是线程2,去执行对应的条件,直到这个条件达成的时候,线程2就会去执行condition.signal()方法,这时JVM就会从阻塞队列中找,找到那些等待condition的线程,当线程1就会收到可执行信号的时候,它的线程状态就会变成Runnable可执行状态

2.signalAll()和signal()区别

① signalAll()会唤醒所有的正在等待的线程

② 但是signal()是公平的,只会唤起那个等待时间最长的线程

3.用Condition实现生产者消费者模式

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class ConditionDemo2 { private int queueSize = 10; private PriorityQueue<Integer> queue = new PriorityQueue<>(queueSize); private Lock lock = new ReentrantLock(); private Condition notFull = lock.newCondition(); private Condition notEmpty = lock.newCondition(); public static void main(String[] args) { ConditionDemo2 conditionDemo2 = new ConditionDemo2(); ConSummer conSummer = conditionDemo2.new ConSummer(); Producer producer = conditionDemo2.new Producer(); conSummer.start(); producer.start(); } class ConSummer extends Thread { @Override public void run() { consume(); } private void consume() { while (true) { lock.lock(); try { while (queue.size() == 0) { System.out.println("队列为空"); try { notEmpty.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.poll(); notFull.signalAll(); System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素"); } finally { lock.unlock(); } } } } class Producer extends Thread { @Override public void run() { producer(); } private void producer() { while (true) { lock.lock(); try { while (queue.size() == queueSize) { System.out.println("队列满,等待有空余"); try { notFull.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.offer(1); notEmpty.signalAll(); System.out.println("从队列里插入了一个元素,队列剩余空间" + (queueSize - queue.size())); } finally { lock.unlock(); } } } } }

4.Condition注意点

① 实际上,如果说Lock用来代替synchronized,那么condition就是用来代替相对应的Object.wait/notify的,所以在用法和性质上,几乎都一样

② awaiot方法会自动释放持有的Lock锁,和Object.wait一样,不需要手动先释放锁

③ 调用await的时候,必须持有锁,否则会抛出异常,和Object.wait一样

CyclicBarrier循环栅栏

1.CyclicBarrier介绍

① CyclicBarrier循环栅栏和CountDownLatch很类似,都能阻塞一组线程

② 当有大量线程相互配合,分别计算不同的任务,并且需要最后统一汇总的时候,我们可以使用CyclicBarrier。CyclicBarrier可以构造一个集结点,当某一个线程执行完毕,它就会到集结点等待,直到所有线程都到了集结点,那么该栅栏就被撤销,所有线程再统一出发,继续执行剩下的任务。

③ 生活中例子:“咱们5个人明天中午在学校碰面,都到齐后,一起讨论下学期的计划”

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class CyclicBarrierDemo { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() { @Override public void run() { System.out.println("这一批人都到场了,大家统一出发"); } }); for (int i = 0; i < 10; i++) { new Thread(new Task(i, cyclicBarrier)).start(); } } static class Task implements Runnable { private int id; private CyclicBarrier cyclicBarrier; public Task(int id, CyclicBarrier cyclicBarrier) { this.id = id; this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("线程" + id + "现在前往集合地点"); try { Thread.sleep((long) (Math.random() * 10000)); System.out.println("线程" + id + "到了集合地点" + "开始等待其他人到达"); cyclicBarrier.await(); System.out.println("线程" + id + "出发了"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } } }

2.CyclicBarrier和CountDownLatch的区别

① 作用不同:CyclicBarrier要等固定数量的线程都到达了栅栏位置才能继续执行,而CountDownLatch只需等待数字到0,也就是说,CountDownLatch用于事件,但是CyclicBarrier是用于线程的

② 可重用性不同:CountDownLatch在倒数到0并触发门闩打开后,就不能再次使用了,除非新建新的实例,而CyclicBarrier可以重复使用

最后

以上就是彪壮曲奇最近收集整理的关于java控制并发流程CountDownLatch、Semaphore、Condition、CyclicBarrier的基础学习的全部内容,更多相关java控制并发流程CountDownLatch、Semaphore、Condition、CyclicBarrier内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(71)

评论列表共有 0 条评论

立即
投稿
返回
顶部