我是靠谱客的博主 飞快日记本,最近开发中收集的这篇文章主要介绍java 多线程协作_Java多线程 3.线程协作,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

3.1 线程协作和线程安全关系

线程协作指不同线程驱动的任务相互依赖,依赖一般就是对共享资源的依赖;

有线程协作一定会有资源共享,有共享就有竞争,有竞争就会有线程安全问题;

线程安全是通过synchronized和Lock机制来保证线程安全;

线程协作通过Object的wait/notify方法,Thread的join/sleep/interrupt/yield,拥塞队列来实现;

线程安全和线程协作编码时可以互不依赖,但是更通用的场景是线程协作要添加线程安全的机制;

3.2 常见线程协作实现

典型的问题就是生产者-消费者问题,生产者和消费者不停地去更新(存/取)商品,商品就是依赖关系的纽带,商品也是生产者线程和消费者线程共享的资源;

以生产者-消费者为例,介绍线程协作的几种实现方式:生产者Producer、消费者Consumer、商品Commodity

>商品类

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

1 importjava.text.SimpleDateFormat;2 importjava.util.Date;3

4 /**

5 * 商品类6 */

7 public classCommodity {8 privateString name;9 privateDate productTime;10

11 publicCommodity() {12 }13

14 publicCommodity(String name, Date productTime) {15 this.name =name;16 this.productTime =productTime;17 }18

19 publicString getName() {20 returnname;21 }22

23 public voidsetName(String name) {24 this.name =name;25 }26

27 publicDate getProductTime() {28 returnproductTime;29 }30

31 public voidsetProductTime(Date productTime) {32 this.productTime =productTime;33 }34

35 @Override36 publicString toString() {37 return "商品:{名称:" + name + ", 生产日期:" + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(this.getProductTime38 ())) + "}";39 }40 }

View Code

3.2.1 Thread的join/yield/sleep

join:线程联合

join为Thread的实例方法,用于线程联合,在线程thread1中调用线程thread2.join()方法,thread1会让出CPU使用进入阻塞直到thread2线程执行结束;如果调用thread2.join指定时间参数,则表示thread1的最多等待时间;

说明:如果join和synchronized同时使用时需要注意,线程thread1中调用thread2.join方法,thread1只让出CPU使用但不释放锁,这很容易死锁;

yield:线程切换

yield为Thread静态方法,线程放弃继续执行,让出cpu释放锁进入可运行状态

说明:执行yield只是让线程暂停一下,让系统重新调度,大多数情况,yield后系统会继续选择当前线程执行(所以不好验证)

sleep:线程休眠

sleep为Thread的类方法,让当前线程让出CPU使用并休眠参数指定的时间

说明:类似join方法,线程休眠期间只会让出cpu使用不会释放任何锁

sleep实现生产者消费者问题

>线程驱动生产者消费者类

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

1 importjava.util.ArrayList;2 importjava.util.Date;3 importjava.util.List;4 importjava.util.concurrent.TimeUnit;5

6 /**

7 * 生产者和消费者2个线程同时访问仓库这个共享资源8 * 没有添加线程安全保证,存在线程安全问题9 */

10 public classK1ThreadSleep {11 //仓库大小

12 private static int REPO_SIZE = 10;13 //仓库

14 private static List repo = newArrayList(REPO_SIZE);15

16 public static void main(String[] args) throwsException {17 new Thread(newProducer(repo, REPO_SIZE)).start();18 new Thread(newConsumer(repo)).start();19 }20 }21

22 /**

23 * 生产者24 */

25 class Producer implementsRunnable {26 //仓库

27 private Listrepo;28 private intrepoSize;29

30 public Producer(List repo, intrepoSize) {31 this.repo =repo;32 this.repoSize =repoSize;33 }34

35 @Override36 public voidrun() {37 //机器一直运行着

38 while (true) {39 try{40 if (repoSize ==repo.size()) {41 //如果仓库满了,机器休息5秒

42 TimeUnit.SECONDS.sleep(5);43 } else{44 //仓库没有满,机器1秒钟生产一个产品

45 TimeUnit.SECONDS.sleep(1);46 Commodity commodity = new Commodity("", newDate());47 repo.add(commodity);48 System.out.println("生产商品:" +commodity.toString());49 }50 } catch(InterruptedException e) {51 System.out.println("中断...");52 }53 }54 }55 }56

57 /**

58 * 消费者59 */

60 class Consumer implementsRunnable {61 //仓库

62 private Listrepo;63

64 public Consumer(Listrepo) {65 this.repo =repo;66 }67

68 @Override69 public voidrun() {70 while (true) {71 try{72 //如果仓库空了,则饿1秒钟

73 if (0 ==repo.size()) {74 TimeUnit.SECONDS.sleep(1);75 } else{76 //如果仓库中有商品,则5秒钟用完一个

77 TimeUnit.SECONDS.sleep(5);78 System.out.println("消费商品:" + repo.remove(0).toString());79 }80 } catch(InterruptedException e) {81 System.out.println("中断...");82 }83 }84 }85 }

View Code

>运行结果:开始生产很快,运行一段时间后 每消费一个商品就生产一个

>为什么:为什么没有添加线程安全?  因为sleep不释放锁

3.2.2 Object的wait/notify

wait和notify是Object的实例方法,和线程安全、线程协作都有关系,线程安全指wait和notify方法只能在synchronized临界区内调用,线程协作指wait使当前线程释放cpu和锁进入等待阻塞,notify/notifyAll唤醒因调用当前对象wait方法进入等待阻塞的线程;

说明:

1. wait可以设置超时等待时间,超时后自己进入同步阻塞状态,无需其他线程唤醒

2. notify/notifyAll唤醒当前对象上的等待线程,使等待线程进入同步阻塞状态,同步阻塞状态的线程均等机会获取锁(恰恰说明synchronized的锁是非公平锁),notify()是随机唤醒单个线程,而notifyAll()是唤醒所有的线程。

3. wait和notify只能在synchronized临界区内调用,并且调用对象只能是进入synchronized临界区的锁的归属对象,否则IllegalMonitorStateException

出错场景1:synchronized(XX.class){wait()} -->> 获得类锁没有取得对象锁

出错场景2:synchronized(a){wait()} -->> 获得的是a对象的锁,而不是当前对象this的锁

出错场景3:synchronized(this){a.wait()} -->> 获得的当前对象的锁,而不是a对象的锁

4.sleep和wait的异同:

sleep()不释放,wait()释放锁;

都可以响应interrupt()中断,从而使线程立刻抛出InterruptedException(但不建议使用该方法)

>线程驱动生产者消费者类

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

1 importjava.util.ArrayList;2 importjava.util.Date;3 importjava.util.List;4 importjava.util.concurrent.TimeUnit;5

6 /**

7 * Created by 巩聪 on 2018/7/11.8 */

9 public classK2ObjectWait {10 //仓库大小

11 private static int REPO_SIZE = 5;12 //仓库

13 private static List repo = newArrayList(REPO_SIZE);14 //synchronized锁归属对象

15 private static Object synLock = newObject();16

17 public static void main(String[] args) throwsException {18 new Thread(newK2Producer(repo, REPO_SIZE, synLock)).start();19 new Thread(newK2Consumer(repo, synLock)).start();20 }21 }22

23 /**

24 * 生产者25 */

26 class K2Producer implementsRunnable {27 //仓库

28 private Listrepo;29 private intrepoSize;30 privateObject synLock;31

32 public K2Producer(List repo, intrepoSize, Object synLock) {33 this.repo =repo;34 this.repoSize =repoSize;35 this.synLock =synLock;36 }37

38 @Override39 public voidrun() {40 //机器一直运行着

41 while (true) {42 //机器1秒钟生产一个产品,生产后尝试存储到仓库

43 long start =System.currentTimeMillis();44 while (System.currentTimeMillis() - start < 1000) {45 }46 Commodity commodity = new Commodity("", newDate());47 try{48 synchronized(synLock) {49 //如果仓库满了进入等待阻塞,等待消费者线程唤醒,唤醒后继续执行

50 if (repoSize ==repo.size()) {51 synLock.wait();52 }53 //仓库有位置,将商品存入仓库

54 repo.add(commodity);55 System.out.println("生产商品:" +commodity.toString());56 //如果消费者因等待商品进入等待阻塞,唤醒消费者,仓库有商品了

57 synLock.notify();58 }59 } catch(InterruptedException e) {60 System.out.println("中断...");61 }62

63 }64 }65 }66

67 /**

68 * 消费者69 */

70 class K2Consumer implementsRunnable {71 //仓库

72 private Listrepo;73 privateObject synLock;74

75 public K2Consumer(Listrepo, Object synLock) {76 this.repo =repo;77 this.synLock =synLock;78 }79

80 @Override81 public voidrun() {82 while (true) {83 try{84 synchronized(synLock) {85 //如果仓库空了,进入等待阻塞,等待生产线程唤醒,唤醒后继续执行

86 if (0 ==repo.size()) {87 synLock.wait();88 }89 //在仓库取出一个商品

90 Commodity commodity = repo.remove(0);91 System.out.println("消费商品:" +commodity.toString());92 //如果生产者阻塞,唤醒生产者,仓库有位置了

93 synLock.notify();94 }95 //5秒钟消费完商品

96 long start =System.currentTimeMillis();97 while (System.currentTimeMillis() - start < 5000) {98 }99 } catch(InterruptedException e) {100 System.out.println("中断...");101 }102 }103 }104 }

View Code

>运行结果:生产消费很快趋于稳定

3.2.3Thread的中断方法

每个线程都有一个标志位,标志当前线程是否中断,Thread类中有获取当前线程中断状态及设置当前线程为中断状态的方法;

interrupted:Thread的类方法,获取当前线程的中断状态,并重置当前线程为非中断状态;

isInterrupted:Thread的实例方法,获取当前线程的中断状态;

interrupt:Thread的实例方法,设置当前线程为中断状态;(只是单纯的设置线程的中断标志,至于线程中断后做什么在线程驱动的任务中可以通过捕获异常或获取中断状态后自己定义)

说明:

1. 如果线程因wait/sleep/join方法进入等待阻塞,或因调用Lock对象的lockInterrupity/tryLock(time)进入同步阻塞状态,其他线程中调用interrupt方法会导致阻塞线程中抛出InterruptException异常;

2. 阻塞状态的线程抛出InterruptException时会重置中断标志(标志位false);

3. 类方法interrupted获取中断状态后会清除中断状态,实例方法isInterrupted()只是获取中断状态;

4. 除使用interrupt方法中断线程外,还有2种方式中止线程执行,a. 退出标志使线程正常退出(线程通信);b.使用stop()方法强行终止线程(不推荐,可能发生不可预料的结果)

1. 中断使用场景:中断阻塞状态

>中断阻塞举例:中断sleep阻塞

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

1 importjava.util.concurrent.TimeUnit;2

3 /**

4 * Created by 巩聪 on 2018/7/12.5 */

6 public classK3ThreadInterrupt {7 public static void main(String[] args ) throwsException8 {9 Thread thread = new Thread(()->{10 try{11 //可以响应中断的阻塞有很多,例如:wait/join/sleep/lock.tryLock(time)/lock.lockInterruptibly

12 TimeUnit.SECONDS.sleep(10);13 } catch(InterruptedException e) {14 //捕获异常后线程恢复中断

15 System.out.println("休眠阻塞的线程被其他线程中断了...");16 }17 });18 thread.start();19 TimeUnit.SECONDS.sleep(1);20 thread.interrupt();21 }22 }

View Code

2. 中断使用场景:线程通信,实现生产者消费者问题

>线程驱动生产者消费者类

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

1 importjava.util.ArrayList;2 importjava.util.Date;3 importjava.util.List;4 importjava.util.concurrent.TimeUnit;5

6 public classK4ThreadInterrupt {7 public static void main(String [] args) throwsInterruptedException {8 List repo = new ArrayList(3);9 new Thread(new K4Producer(repo, 3), "producer").start();10 new Thread(new K4Consumer(repo), "consumer").start();11 }12 public staticThread getThread(String threadName)13 {14 ThreadGroup threadGroup =Thread.currentThread().getThreadGroup();15 Thread[] threads = newThread[threadGroup.activeCount()];16 threadGroup.enumerate(threads);17 for(Thread thread : threads) {18 if(thread!=null &&threadName.equals(thread.getName()))19 {20 returnthread;21 }22 }23 return null;24 }25 public static void lastTime(TimeUnit timeUnit, inttime)26 {27 long start =System.currentTimeMillis();28 long msTime =timeUnit.toMillis(time);29 while (System.currentTimeMillis() - start repo;35 private intrepoSize;36 privateThread consumer;37

38 public K4Producer(List repo, intrepoSize) {39 this.repo =repo;40 this.repoSize =repoSize;41 }42

43 public voidsetConsumer(Thread consumer) {44 this.consumer =consumer;45 }46

47 @Override48 public voidrun() {49 while (true)50 {51 //当前为中断状态(判定后清除),且容器没有满

52 if(Thread.interrupted() && repo.size() repo;69 privateThread producer;70

71 public K4Consumer(Listrepo) {72 this.repo =repo;73 }74 public voidsetProducer(Thread producer) {75 this.producer =producer;76 }77 @Override78 public voidrun() {79 while (true)80 {81 //当前为中断状态(判定后清除),且容器没有满

82 if(Thread.interrupted() && repo.size() > 0)83 {84 lastTime(TimeUnit.SECONDS, 5);85 Commodity commodity = repo.remove(0);86 System.out.println("消费:"+commodity.toString());87 }88 Thread producer = getThread("producer");89 if(null !=producer)90 {91 producer.interrupt();92 }93 }94

95 }96 }97 }

View Code

说明:java volatile也可以实现类似功能

>运行结果:生产消费很快趋于稳定

3.2.4 拥塞队列/BlockingQueue

先粗略说一下java的容器,java容器有Collection和Map2个根接口,Collection又分为Set/集合、List/列表、Queue/队列;Set、List、Queue和Map就是我们常用的4种容器;

在线程安全的角度容器又分为线程安全和线程不安全的容器,线程不安全的常用容器有HashSet、ArrayList、PriorityQueue、HashMap等,如果多线程同时读写会有容器内数据不一致问题;

线程安全的容器又分为同步容器和并发容器,其中同步容器有Vector、Stack、HashTable,同步容器是在方法上面添加synchronized来保证线程安全;

并发容器指java.util.concurrent包中定义的容器,如常见的ConcurrentHashMap、CopyOnWriteArraySet、CopyOnWriteArrayList等,同步容器一般是通过各种Lock来保证线程安全的;

java.util.concurrent包中有一个比较特殊的容器接口BlockingQueue/阻塞队列,除了提供了线程安全外,还提供原有方法的阻塞版本,会对线程的调用产生阻塞,常用于线程池存放任务和生产者消费者问题;

队列的基本特性是先进先出,从队尾插入,队头删除;虽然Queue实现了Collection有删除指定元素的特性,但是使用队列一般都按照队列的使用方式队尾插入,队头删除;

JDK8提供了6种单向拥塞队列和1种双向拥塞队列:

1. 拥塞队列的常用方法

添加元素:

add(e): 非阻塞,队列满时抛出异常

offer(e)/offer(e, time): 非阻塞或定时阻塞,添加成功返回true,队列满时返回false,阻塞过程可以响应中断

put(e):阻塞,队列满时一直阻塞,阻塞过程可以响应中断

删除并返回元素:

remove():非阻塞,队列为空时抛出异常

poll()/poll(time):非阻塞或定时阻塞,队列为空返回null,阻塞过程可以响应中断

take():阻塞,队列为空时一直阻塞,阻塞过程可以响应中断

2.队列分类:

ArrayBlockingQueue/LinkedBlockingQueue:数组实现队列,需要指定大小,内存连续 ;       链表实现队列,默认大小为Integer.MAX_VALUE, 内存可以不连续

>队列基本的添加和删除

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

1 importjava.util.concurrent.ArrayBlockingQueue;2 importjava.util.concurrent.BlockingQueue;3 importjava.util.concurrent.TimeUnit;4

5 /**

6 * 数组和链表队列是最基本的2种队列,实现队列的3种添加,3种删除方法7 */

8 public classK0BlockingQueue {9 public static void main(String[] args) throwsException {10 //数组队列必须指定大小

11 BlockingQueue queue = new ArrayBlockingQueue(10);12 //链表队列可以省略队列大小参数,默认为Integer.MAX_VALUE13 //queue = new LinkedBlockingDeque(10);

14 new Thread(() ->{15 int count = 0;16 while (true) {17 //每隔2秒向队列放一个

18 lastTime(TimeUnit.SECONDS, 2);19 booleanresult;20 //如果队列满了,会抛出异常21 //result = queue.add("element"+(++count));22 //队列满了,返回false

23 result = queue.offer("element" + (++count));24 //try {25 // //队列满了,会阻塞,方法没有返回值26 //queue.put("element"+(++count));27 //} catch (InterruptedException e) {28 //System.out.println("中断...");29 //}

30 System.out.println("添加结果:" +result);31 }32 }).start();33 while (true) {34 while (true) {35 //每1秒从队列取元素

36 lastTime(TimeUnit.SECONDS, 1);37 String element = "";38 //队列为空时抛出异常39 //element = queue.remove();40 //队列为空时,返回null

41 element =queue.poll();42 //队列为空时阻塞43 //element = queue.take();

44 System.out.println("取出结果:" +element);45 }46 }47 }48

49 //空循环指定时间

50 public static void lastTime(TimeUnit timeUnit, longtime) {51 long start =System.currentTimeMillis();52 while (timeUnit.toMillis(time) > System.currentTimeMillis() -start) {53 }54 }55 }

View Code

SynchronousQueue:同步队列,大小为1(不存储元素),一个添加操作后必须删除操作

>同步队列添加和删除

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

1 importjava.util.concurrent.BlockingQueue;2 importjava.util.concurrent.SynchronousQueue;3 importjava.util.concurrent.TimeUnit;4

5 /**

6 * 同步队列比较特殊,队列里面不能缓存多于1个元素,添加和删除必须依次执行,不会连续2次添加成功,不会连续2次删除成功7 * 一般使用阻塞方法8 */

9 public classK0BlockingQueue {10 public static void main(String[] args) throwsException {11 //同步队列没有大小,构造函数只有一个Boolean类型参数,标识是否公平队列

12 BlockingQueue queue = newSynchronousQueue();13 new Thread(() ->{14 int count = 0;15 while (true) {16 try{17 //队列满了,会阻塞,方法没有返回值

18 String element = "element"+(++count);19 queue.put(element);20 System.out.println("添加成功:"+element);21 } catch(InterruptedException e) {22 System.out.println("中断...");23 }24 }25 }).start();26 while (true) {27 while (true) {28 //3秒取出一个元素

29 lastTime(TimeUnit.SECONDS, 3);30 //队列为空时阻塞

31 String element =queue.take();32 System.out.println("取出结果:" +element);33 }34 }35 }36 //空循环指定时间

37 public static void lastTime(TimeUnit timeUnit, longtime) {38 long start =System.currentTimeMillis();39 while (timeUnit.toMillis(time) > System.currentTimeMillis() -start) {40 }41 }42 }

View Code

PriorityBlockingQueue:优先级队列,队列元素必须实现Comparable接口(否则运行异常)复写compareTo(T)方法

>优先级队列添加和删除

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

1 importjava.util.concurrent.BlockingQueue;2 importjava.util.concurrent.PriorityBlockingQueue;3 importjava.util.concurrent.TimeUnit;4

5 /**

6 * 优先级队列提供了队列元素插队功能,新添加的元素不一定在队尾7 * 队列元素必须实现Comparable接口8 */

9 public classK0BlockingQueue {10 public static void main(String[] args) throwsException {11 //优先级队列元素必须实现Comparable接口 (或者构造函数提供Comparator实现)

12 BlockingQueue queue = newPriorityBlockingQueue();13 new Thread(() ->{14 //分5批添加,每次添加3个

15 for (int i = 1; i <= 5; i++) {16 for (int j = 1; j <= 3; j++) {17 Element element = new Element(i, j == 3 ? 'W' : 'M');18 try{19 queue.put(element);20 } catch(InterruptedException e) {21 System.out.println("中断了...");22 }23

24 }25 System.out.println(String.format("第%d批添加完毕.", i));26 lastTime(TimeUnit.SECONDS, 3);27 }28 }).start();29

30 while (true) {31 while (true) {32 //1秒取出一个元素

33 lastTime(TimeUnit.SECONDS, 2);34 Element element =queue.take();35 System.out.println("取出元素:" +element.toString());36 }37 }38 }39

40 //空循环指定时间

41 public static void lastTime(TimeUnit timeUnit, longtime) {42 long start =System.currentTimeMillis();43 while (timeUnit.toMillis(time) > System.currentTimeMillis() -start) {44 }45 }46

47 /**

48 * 队列元素49 */

50 static class Element implements Comparable{51 //年龄

52 private intage;53 //性别:M,W

54 private charsex;55

56 public Element(int age, charsex) {57 this.age =age;58 this.sex =sex;59 }60

61 public intgetAge() {62 returnage;63 }64

65 public void setAge(intage) {66 this.age =age;67 }68

69 public chargetSex() {70 returnsex;71 }72

73 public void setSex(charsex) {74 this.sex =sex;75 }76

77 /**

78 * 年龄大的优先79 * 年龄相同女士优先80 * 年龄和性别相同随便81 *82 *@paramo83 *@return

84 */

85 @Override86 public intcompareTo(Element o) {87 int result = o.getAge() - this.getAge();88 if (result == 0) {89 if (this.getSex() == 'W') {90 result = -1;91 } else{92 result = 1;93 }94 }95 returnresult;96 }97

98 @Override99 publicString toString() {100 return "Element{age=" + age + ", sex=" + sex + "}";101 }102 }103 }

View Code

>执行结果:先取出队列中年龄大的女性元素  (因为连续添加3个元素没有添加同步,所以可能出现同一批添加过程有取出操作)

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

第1批添加完毕.

取出元素:Element{age=1, sex=W}

第2批添加完毕.

取出元素:Element{age=2, sex=W}

取出元素:Element{age=2, sex=M}

第3批添加完毕.

取出元素:Element{age=3, sex=W}

第4批添加完毕.

取出元素:Element{age=4, sex=W}

取出元素:Element{age=4, sex=M}

第5批添加完毕.

取出元素:Element{age=5, sex=W}

取出元素:Element{age=5, sex=M}

取出元素:Element{age=5, sex=M}

取出元素:Element{age=4, sex=M}

取出元素:Element{age=3, sex=M}

取出元素:Element{age=3, sex=M}

取出元素:Element{age=2, sex=M}

取出元素:Element{age=1, sex=M}

取出元素:Element{age=1, sex=M}

View Code

DelayQueue:延迟队列,队列元素必须实现Delayed接口(否则运行异常),Delayed接口继承了Comparable接口,所以必须复写compareTo(T)和getDelay(TimeUnit)2个方法,队列中的首个元素,如果getDelay返回值小于等于0则会出队列

>延迟队列添加和删除

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

1 importjava.text.SimpleDateFormat;2 importjava.util.Date;3 importjava.util.concurrent.BlockingQueue;4 importjava.util.concurrent.DelayQueue;5 importjava.util.concurrent.Delayed;6 importjava.util.concurrent.TimeUnit;7

8 /**

9 * 延迟队列指元素添加到队列中后,可以让元素不被立即取出,而是等待指定时间或条件10 * 队列元素必须实现Delayed,(Delayed接口继承了Comparable接口)11 */

12 public classK0BlockingQueue {13 public static void main(String[] args) throwsException {14 //延迟队列必须实现Delayed接口

15 BlockingQueue queue = newDelayQueue();16 new Thread(() ->{17 while (true) {18 lastTime(TimeUnit.SECONDS, 3);19 queue.offer(newElement());20 }21 }).start();22

23 while (true) {24 while (true) {25 //1秒取出一个元素

26 lastTime(TimeUnit.SECONDS, 1);27 System.out.println("取出元素:" +queue.take().toString());28 }29 }30 }31

32 //空循环指定时间

33 public static void lastTime(TimeUnit timeUnit, longtime) {34 long start =System.currentTimeMillis();35 while (timeUnit.toMillis(time) > System.currentTimeMillis() -start) {36 }37 }38

39 /**

40 * 队列元素41 */

42 static class Element implementsDelayed {43 private static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");44 privateDate addDate;45

46 publicDate getAddDate() {47 returnaddDate;48 }49

50 publicElement() {51 addDate = newDate();52 }53

54 /**

55 * 返回小于等于0的值时,元素可以被取出56 *57 *@paramunit58 *@return

59 */

60 @Override61 public longgetDelay(TimeUnit unit) {62 //addDate.getTime() 返回毫秒63 //3秒后取出

64 return addDate.getTime() + TimeUnit.SECONDS.toMillis(3) -System.currentTimeMillis();65 }66

67 @Override68 public intcompareTo(Delayed o) {69 if (o instanceofElement) {70 return (int) (this.getAddDate().getTime() -((Element) o).getAddDate().getTime());71 }72 return 0;73 }74

75 @Override76 publicString toString() {77 return "Element{addTime=" + dateFormat.format(this.addDate) + ", getTime=" + dateFormat.format(new Date()) +

78 "}";79 }80 }81 }

View Code

LinkedTransferQueue

LinkedBlockingDeque

3. 队列实现线程协作 : 使用同步队列

>延迟队列添加和删除

8f900a89c6347c561fdf2122f13be562.png

961ddebeb323a10fe0623af514929fc1.png

1 importjava.util.Date;2 importjava.util.concurrent.BlockingQueue;3 importjava.util.concurrent.SynchronousQueue;4

5 /**

6 *拥塞队列是实现生产者/消费者问题最简单的方式7 */

8 public classK4Blocking {9 public static void main(String[] args) throwsException {10 BlockingQueue repo = newSynchronousQueue();11 new Thread(newK4Producer(repo)).start();12 new Thread(newK4Consumer(repo)).start();13 }14 }15

16 class K4Producer implementsRunnable {17 private BlockingQueuerepo;18

19 public K4Producer(BlockingQueuerepo) {20 this.repo =repo;21 }22

23 @Override24 public voidrun() {25 while (true) {26 try{27 //卖方市场:消费者会因为没有商品而阻塞,生产者生产完会直接被消费28 //买方市场:需要在消费者线程控制时间

29 long start =System.currentTimeMillis();30 while (System.currentTimeMillis() - start < 3000) {31 }32 Commodity commodity = new Commodity("...", newDate());33 System.out.println("生产商品:"+commodity.toString());34 this.repo.put(commodity);35 } catch(InterruptedException e) {36 System.out.println("中断了...");37 }38 }39 }40 }41

42 class K4Consumer implementsRunnable {43 private BlockingQueuerepo;44

45 public K4Consumer(BlockingQueuerepo) {46 this.repo =repo;47 }48

49 @Override50 public voidrun() {51 while (true) {52 try{53 Commodity commodity =repo.take();54 System.out.println("消费商品:"+commodity.toString());55 } catch(InterruptedException e) {56 System.out.println("中断了...");57 }58 }59 }60 }

View Code

>执行结果:每生产一个商品,瞬间消费

最后

以上就是飞快日记本为你收集整理的java 多线程协作_Java多线程 3.线程协作的全部内容,希望文章能够帮你解决java 多线程协作_Java多线程 3.线程协作所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(58)

评论列表共有 0 条评论

立即
投稿
返回
顶部