复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141package org.rui.thread.newc; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.rui.generics.anonymity.BasicGenerator; import org.rui.generics.anonymity.Generator; import org.rui.thread.newc.semaphore.Fat; /** * @author lenovo * 在两个任务之间切换栅栏,当这些栅栏进入任务时,它们和自拥有一个对象,当它们离开时,它们都拥有之前由对象持有的对象。 * * == * 可以有更多的对象在被创建的同时被消费。 * */ /** * 生产者 */ class ExchangerProduer<T> implements Runnable { private Generator<T> generator;// 生成器 private Exchanger<List<T>> exchanger; private List<T> holder;// 对象集合 ExchangerProduer(Exchanger<List<T>> exchg, Generator<T> generator, List<T> holder) { this.generator = generator; this.exchanger = exchg; this.holder = holder; } @Override public void run() { try { while (!Thread.interrupted()) { for (int i = 0; i < ExchangerDemo.size; i++) { // 把加工好的对象放入集合中 T t = generator.next(); System.err.println(Thread.currentThread().getName() + ">>生产对象:" + t); holder.add(generator.next()); /** * exchange(V v) * 等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。 */ // exchange full for empty: 全部为空 holder = exchanger.exchange(holder); } } } catch (InterruptedException e) { // ok to terminate this way 可以以这种方式终止 } } } /** * demo main * @author lenovo * * @param <T> */ class ExchangerConsumer<T> implements Runnable { private Exchanger<List<T>> exchanger; private List<T> holder; private volatile T value; ExchangerConsumer(Exchanger<List<T>> ex, List<T> holder) { exchanger = ex; this.holder = holder; } @Override public void run() { try { while (!Thread.interrupted()) { // 消费者取出对象 进行处理 exchanger.exchange(holder); for (T x : holder) { value = x;// fetch out value System.out.println(Thread.currentThread().getName() + "消费对象:" + x); holder.remove(x);// ok for copyOmWriteArrayList 消费完成移除 } } } catch (InterruptedException e) { // ok to terminate this way } System.out.println("Final value:" + value); } } public class ExchangerDemo { static int size = 10; static int delay = 5; public static void main(String[] args) throws Exception { String[] argss = new String[] { "10", "5" }; if (argss.length > 0) { size = new Integer(argss[0]); } if (argss.length > 1) { delay = new Integer(argss[0]); } ExecutorService exec = Executors.newCachedThreadPool(); Exchanger<List<Fat>> xc = new Exchanger<List<Fat>>(); // ----------------------生产者 // CopyOnWriteArrayList >>>ArrayList 的一个线程安全的变体, // 其中所有可变操作(add、set 等等)都是通过对底层数组进行一次新的复制来实现的 List<Fat> producerList = new CopyOnWriteArrayList<Fat>(); ExchangerProduer produer = new ExchangerProduer<Fat>(xc, BasicGenerator.create(Fat.class), producerList); //当调用 exchange(), 它将阻塞直至对方任务调用它自已的exchange()方法,那时,两个exchange()方法将全部完成,而List<T>则被互转: exec.execute(produer); TimeUnit.SECONDS.sleep(delay); // ---------------------- 消费 List<Fat> consumerList = new CopyOnWriteArrayList<Fat>();// 消费者 exec.execute(new ExchangerConsumer<Fat>(xc, consumerList)); TimeUnit.SECONDS.sleep(delay); // 试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。 exec.shutdownNow(); } } /** * output: * Final value:Fat>>id:89024 */
相关辅助类
复制代码
1
2
3
4
5
6
7//Generator.java package org.rui.generics.anonymity; public interface Generator<T> { // 返回泛型的内型对象 T next(); }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25package org.rui.generics.anonymity; public class BasicGenerator<T> implements Generator<T> { private Class<T> type; public BasicGenerator(Class<T> type) { this.type = type; } @Override public T next() { try { return type.newInstance(); } catch (Exception e) { throw new RuntimeException(e); } } public static <T> Generator<T> create(Class<T> type) { return new BasicGenerator<T>(type); } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26package org.rui.thread.newc.semaphore; public class Fat { private volatile double d; private static int counter = 0; private final int id = counter++; public Fat() { // expensive, interruptible operation: for (int i = 1; i < 10000; i++) { d += (Math.PI + Math.E) / (double) i; } } public void operation() { System.out.println("operation>> "+this); } @Override public String toString() { return "Fat>>id:" + id; } }
最后
以上就是包容豌豆最近收集整理的关于java多线程并发——Exchanger 两个任务之间交换对象的全部内容,更多相关java多线程并发——Exchanger内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复