我是靠谱客的博主 洁净绿茶,最近开发中收集的这篇文章主要介绍Java编程思想笔记——并发3,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

终结任务

装饰性花园

公园人数共享计数值递增:

class Count {
private int count = 0;
private Random rand = new Random(47);
// Remove the synchronized keyword to see counting fail:
public synchronized int increment() {
int temp = count;
if (rand.nextBoolean()) {// Yield half the time
Thread.yield();
}
return (count = ++temp);
}
public synchronized int value() {
return count;
}
}
class Entrance implements Runnable {
private static Count count = new Count();
private static List<Entrance> entrances =
new ArrayList<Entrance>();
private int number = 0;
// Doesn't need synchronization to read:
private final int id;
private static volatile boolean canceled = false;
// Atomic operation on a volatile field:
public static void cancel() {
canceled = true;
}
public Entrance(int id) {
this.id = id;
// Keep this task in a list. Also prevents
// garbage collection of dead tasks:
entrances.add(this);
}
@Override
public void run() {
while (!canceled) {
synchronized (this) {
++number;
}
print(this + " Total: " + count.increment());
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
print("sleep interrupted");
}
}
print("Stopping " + this);
}
public synchronized int getValue() {
return number;
}
@Override
public String toString() {
return "Entrance " + id + ": " + getValue();
}
public static int getTotalCount() {
return count.value();
}
public static int sumEntrances() {
int sum = 0;
for (Entrance entrance : entrances) {
sum += entrance.getValue();
}
return sum;
}
}
public class OrnamentalGarden {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exec.execute(new Entrance(i));
}
// Run for a while, then stop and collect the data:
TimeUnit.SECONDS.sleep(3);
Entrance.cancel();
exec.shutdown();
if (!exec.awaitTermination(250, TimeUnit.MILLISECONDS)) {
print("Some tasks were not terminated!");
}
print("Total: " + Entrance.getTotalCount());
print("Sum of Entrances: " + Entrance.sumEntrances());
}
} /*
Entrance 0: 1 Total: 1
Entrance 2: 1 Total: 3
Entrance 1: 1 Total: 2
Entrance 4: 1 Total: 5
Entrance 3: 1 Total: 4
Entrance 2: 2 Total: 6
Entrance 4: 2 Total: 7
Entrance 0: 2 Total: 8
...
Entrance 3: 29 Total: 143
Entrance 0: 29 Total: 144
Entrance 4: 29 Total: 145
Entrance 2: 30 Total: 147
Entrance 1: 30 Total: 146
Entrance 0: 30 Total: 149
Entrance 3: 30 Total: 148
Entrance 4: 30 Total: 150
Stopping Entrance 2: 30
Stopping Entrance 1: 30
Stopping Entrance 0: 30
Stopping Entrance 3: 30
Stopping Entrance 4: 30
Total: 150
Sum of Entrances: 150
*/

每个Entrance任务都维护着一个本地值number,它包含通过某个特定入口进入的参观者的数量。这提供了对count对象的双层检查,以确保其记录的参观者数量是正确的。
在3秒之后,main()向Entrance发送static cancel()消息,然后调用exec对象的shutdown()方法。ExecutorService.awaitTermination()等待每个任务结束,如果所有的任务在超时时间达到之前全部结束,则返回true,否则返回false,表示不是所有的任务都已经结束了。尽管这会导致每个任务都退出其run()方法,并因此作为任务而终止,但是Entrance对象仍旧是有效的。

在阻塞时终结

线程状态:
新建(new),已经分配必须的系统资源,并执行了初始化。有资格获得CPU时间
就绪(Runnable),调度器把时间片分配给线程,在任意时刻,线程可以运行也可以不运行。只要调度器能分配时间片给线程,它就可以运行,这不同于死亡和阻塞状态。
阻塞(Blocked),线程能够运行,但某个条件阻止他的运行。当处于阻塞状态时,调度器将忽略线程,不在分配CPU时间。直到线程重新进入了就绪状态。
死亡(Dead),不可调度,不会得到CPU时间

进入阻塞状态

1.sleep()进入休眠状态
2.调用wait()使线程挂起
3.任务在等待有个输入和输出完成
4.试图在有个对象调用同步控制方法,但对象锁不可用

中断

Thread类包含interrupt()方法,因此可以终止被阻塞的任务,这个方法将设置线程的中断状态。如果一个线程已经被阻塞或者试图执行一个阻塞操作,那么设置这个线程的中断状态将抛出InterruptedException。当抛出该异常或者该任务调用Thread.interrupted()时,中断状态将被复位。
如果在Executor上调用shutdownNow(),那么它将发送一个interrupt()调用给它启动的所有线程。
希望只中断某个单一任务,如果使用Executor,那么通过调用submit()而不是executor()来启动任务,就可以持有该任务的上下文。submit()将返回一个泛型Future

class SleepBlocked implements Runnable {
public void run() {
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
print("InterruptedException");
}
print("Exiting SleepBlocked.run()");
}
}
class IOBlocked implements Runnable {
private InputStream in;
public IOBlocked(InputStream is) {
in = is;
}
public void run() {
try {
print("Waiting for read():");
in.read();
} catch (IOException e) {
if (Thread.currentThread().isInterrupted()) {
print("Interrupted from blocked I/O");
} else {
throw new RuntimeException(e);
}
}
print("Exiting IOBlocked.run()");
}
}
class SynchronizedBlocked implements Runnable {
public synchronized void f() {
while (true) // Never releases lock
Thread.yield();
}
public SynchronizedBlocked() {
new Thread() {
public void run() {
f(); // Lock acquired by this thread
}
}.start();
}
public void run() {
print("Trying to call f()");
f();
print("Exiting SynchronizedBlocked.run()");
}
}
public class Interrupting {
private static ExecutorService exec =
Executors.newCachedThreadPool();
static void test(Runnable r) throws InterruptedException {
Future<?> f = exec.submit(r);
TimeUnit.MILLISECONDS.sleep(100);
print("Interrupting " + r.getClass().getName());
f.cancel(true); // Interrupts if running
print("Interrupt sent to " + r.getClass().getName());
}
public static void main(String[] args) throws Exception {
test(new SleepBlocked());
test(new IOBlocked(System.in));
test(new SynchronizedBlocked());
TimeUnit.SECONDS.sleep(3);
print("Aborting with System.exit(0)");
System.exit(0); // ... since last 2 interrupts failed
}
} /*
Interrupting SleepBlocked
InterruptedException
Exiting SleepBlocked.run()
Interrupt sent to SleepBlocked
Waiting for read():
Interrupting IOBlocked
Interrupt sent to IOBlocked
Trying to call f()
Interrupting SynchronizedBlocked
Interrupt sent to SynchronizedBlocked
Aborting with System.exit(0)
*/

SleepBlock是可中断的阻塞,而IOBlocked和SynchronizedBlocked是不可中断的阻塞。证明IO和synchronized块上的等待是不可中断的(不需要任何InterruptedException处理器)。

对于这类问题,有一个笨拙但行之有效的解决方案,即关闭任务在其上发生阻塞的底层资源:

public class CloseResource {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
ServerSocket server = new ServerSocket(8080);
InputStream socketInput =
new Socket("localhost", 8080).getInputStream();
exec.execute(new IOBlocked(socketInput));
exec.execute(new IOBlocked(System.in));
TimeUnit.MILLISECONDS.sleep(100);
print("Shutting down all threads");
exec.shutdownNow();
TimeUnit.SECONDS.sleep(1);
print("Closing " + socketInput.getClass().getName());
socketInput.close(); // Releases blocked thread
TimeUnit.SECONDS.sleep(1);
print("Closing " + System.in.getClass().getName());
System.in.close(); // Releases blocked thread
}
} /*
Waiting for read():
Waiting for read():
Shutting down all threads
Closing java.net.SocketInputStream
Interrupted from blocked I/O
Exiting IOBlocked.run()
Closing java.io.BufferedInputStream
Exiting IOBlocked.run()
*/

在shutdownNow()被调用之后以及在两个输入流上调用colse()之前的延迟强调的是一旦底层资源被关闭,任务将解除阻塞。interrupt()看起来发生在关闭Socket而不是关闭System.in的时刻。

各种nio类提供了更人性化的IO中断:

class NIOBlocked implements Runnable {
private final SocketChannel sc;
public NIOBlocked(SocketChannel sc) {
this.sc = sc;
}
public void run() {
try {
print("Waiting for read() in " + this);
sc.read(ByteBuffer.allocate(1));
} catch (ClosedByInterruptException e) {
print("ClosedByInterruptException");
} catch (AsynchronousCloseException e) {
print("AsynchronousCloseException");
} catch (IOException e) {
throw new RuntimeException(e);
}
print("Exiting NIOBlocked.run() " + this);
}
}
public class NIOInterruption {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
ServerSocket server = new ServerSocket(8080);
InetSocketAddress isa =
new InetSocketAddress("localhost", 8080);
SocketChannel sc1 = SocketChannel.open(isa);
SocketChannel sc2 = SocketChannel.open(isa);
Future<?> f = exec.submit(new NIOBlocked(sc1));
exec.execute(new NIOBlocked(sc2));
exec.shutdown();
TimeUnit.SECONDS.sleep(1);
// Produce an interrupt via cancel:
f.cancel(true);
TimeUnit.SECONDS.sleep(1);
// Release the block by closing the channel:
sc2.close();
}
} /*
Waiting for read() in NIOBlocked@7a84e4
Waiting for read() in NIOBlocked@15c7850
ClosedByInterruptException
Exiting NIOBlocked.run() NIOBlocked@15c7850
AsynchronousCloseException
Exiting NIOBlocked.run() NIOBlocked@7a84e4
*/

还可以关闭底层资源以释放锁。注意,使用execute()来启动两个任务,并调用e.shutdownNow()将可以很容易地终止所有事物,而对于捕获上面实例中的Future,只有在将中断发送给一个线程,同时不发生给另一个线程时才是必须的。

被互斥所阻塞

对象锁已经被其他任务获得,那么调用任务将被挂起(阻塞),直至这个锁可获得。

public class MultiLock {
public synchronized void f1(int count) {
if (count-- > 0) {
print("f1() calling f2() with count " + count);
f2(count);
}
}
public synchronized void f2(int count) {
if (count-- > 0) {
print("f2() calling f1() with count " + count);
f1(count);
}
}
public static void main(String[] args) throws Exception {
final MultiLock multiLock = new MultiLock();
new Thread() {
public void run() {
multiLock.f1(10);
}
}.start();
}
} /*
f1() calling f2() with count 9
f2() calling f1() with count 8
f1() calling f2() with count 7
f2() calling f1() with count 6
f1() calling f2() with count 5
f2() calling f1() with count 4
f1() calling f2() with count 3
f2() calling f1() with count 2
f1() calling f2() with count 1
f2() calling f1() with count 0
*/

由于这个任务已经在第一个对f1的调用中获得了munltiLock对象锁,因此同一任务将在对f2的调用中再次获取这个锁。以此类推。

只要任务以不可中断的方式被阻塞,那么都有潜在的会锁住程序的可能。Java SE5并发库添加了一个特性,即在ReentrantLock上阻塞任务具备可以被中断的能力,这与synchronized方法或临界区上阻塞的任务完全不同:

class BlockedMutex {
private Lock lock = new ReentrantLock();
public BlockedMutex() {
// Acquire it right away, to demonstrate interruption
// of a task blocked on a ReentrantLock:
lock.lock();
}
public void f() {
try {
// This will never be available to a second task
lock.lockInterruptibly(); // Special call
print("lock acquired in f()");
} catch (InterruptedException e) {
print("Interrupted from lock acquisition in f()");
}
}
}
class Blocked2 implements Runnable {
BlockedMutex blocked = new BlockedMutex();
public void run() {
print("Waiting for f() in BlockedMutex");
blocked.f();
print("Broken out of blocked call");
}
}
public class Interrupting2 {
public static void main(String[] args) throws Exception {
Thread t = new Thread(new Blocked2());
t.start();
TimeUnit.SECONDS.sleep(1);
System.out.println("Issuing t.interrupt()");
t.interrupt();
}
} /*
Waiting for f() in BlockedMutex
Issuing t.interrupt()
Interrupted from lock acquisition in f()
Broken out of blocked call
*/

在Blocked2中,run()方法总是在调用blocked.f()的地方停止。与IO调用不同,interrupt()可以打断被互斥所阻塞的调用。

检查中断

在线程上调用interrupt()时,中断发生的唯一时刻是在任务要进入到阻塞操作中,或者已经在阻塞操作内部时。但如果已经编写了会产生阻塞的代码,要调用interrupt()以停止某个任务,那么在run()循环碰巧没有任何阻塞调用,这种机会是由中断状态来表示的,其状态可以通过调用interrupt()来设置,可以通过调用interrupted()来检查中断状态,这不仅可以知道interrupt()是否被调用过,而且还可以清除中断状态。
在run()方法中使用它来处理在中断状态被设置时,被阻塞和不被阻塞的各种可能:

class NeedsCleanup {
private final int id;
public NeedsCleanup(int ident) {
id = ident;
print("NeedsCleanup " + id);
}
public void cleanup() {
print("Cleaning up " + id);
}
}
class Blocked3 implements Runnable {
private volatile double d = 0.0;
public void run() {
try {
while (!Thread.interrupted()) {
// point1
NeedsCleanup n1 = new NeedsCleanup(1);
// Start try-finally immediately after definition
// of n1, to guarantee proper cleanup of n1:
try {
print("Sleeping");
TimeUnit.SECONDS.sleep(1);
// point2
NeedsCleanup n2 = new NeedsCleanup(2);
// Guarantee proper cleanup of n2:
try {
print("Calculating");
// A time-consuming, non-blocking operation:
for (int i = 1; i < 2500000; i++)
d = d + (Math.PI + Math.E) / d;
print("Finished time-consuming operation");
} finally {
n2.cleanup();
}
} finally {
n1.cleanup();
}
}
print("Exiting via while() test");
} catch (InterruptedException e) {
print("Exiting via InterruptedException");
}
}
}
public class InterruptingIdiom {
public static void main(String[] args) throws Exception {
if (args.length != 1) {
print("usage: java InterruptingIdiom delay-in-mS");
System.exit(1);
}
Thread t = new Thread(new Blocked3());
t.start();
TimeUnit.MILLISECONDS.sleep(new Integer(args[0]));
t.interrupt();
}
} /*
NeedsCleanup 1
Sleeping
NeedsCleanup 2
Calculating
Finished time-consuming operation
Cleaning up 2
Cleaning up 1
NeedsCleanup 1
Sleeping
Cleaning up 1
Exiting via InterruptedException
*/

如果interrupt()在注释point2之后(即在非阻塞的操作过程中)被调用,那么首先循环将结束,然后所有的本地对象将被销毁,最后循环会经由while语句的顶部退出。但是,如果interrupt()在point1和point2之间(在while语句之后,但是在阻塞操作sleep()之前或其过程中)被调用,那么这个任务就会在第一次试图调用阻塞操作之前,经由InterruptedException退出。在这种情况下,在异常被抛出之时唯一被创建出来的NeedsCleanup对象将被清除,而你也就有了在catch子句中执行其他任何清除工作的机会。

线程之间的协作

互斥用来解决资源共享问题。
协作,多个任务可以一起工作去解决某个问题,关键问题是这些任务之间的握手。为了实现握手,使用相同的基本特性:互斥,互斥能够确保只有一个任务可以相应某个信号,这样就可以根除任何可能的竞争条件。

wait()与notifyAll()

wait()等待某个条件发生变化,而改变这个条件将由另一个任务完成。wait()会在等待外部世界产生变化的时候将任务挂起,并且只有在notify()或notifyAll()发生时,这个任务才会被唤醒并去检查所产生的变化。调用sleep()和yield()并没有释放锁。
有两种wait(),一是接受毫秒数作为参数,在此期间暂停,二是不接收任何参数,将无限等待下去,直到线程接收到notify()或notifyAll()消息。实际上,只能在同步控制方法或同步控制块里调用wait()notify()和notifyAll()。如果在非同步快在调用,将得到IllegalMonitorStateException异常,也就是说在调用方法前必须拥有对象的锁。

Car涂蜡,抛光:

class Car {
private boolean waxOn = false;
public synchronized void waxed() {
waxOn = true; // Ready to buff
notifyAll();
}
public synchronized void buffed() {
waxOn = false; // Ready for another coat of wax
notifyAll();
}
public synchronized void waitForWaxing()
throws InterruptedException {
while (waxOn == false)
wait();
}
public synchronized void waitForBuffing()
throws InterruptedException {
while (waxOn == true)
wait();
}
}
class WaxOn implements Runnable {
private Car car;
public WaxOn(Car c) {
car = c;
}
public void run() {
try {
while (!Thread.interrupted()) {
printnb("Wax On! ");
TimeUnit.MILLISECONDS.sleep(200);
car.waxed();
car.waitForBuffing();
}
} catch (InterruptedException e) {
print("Exiting via interrupt");
}
print("Ending Wax On task");
}
}
class WaxOff implements Runnable {
private Car car;
public WaxOff(Car c) {
car = c;
}
public void run() {
try {
while (!Thread.interrupted()) {
car.waitForWaxing();
printnb("Wax Off! ");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
}
} catch (InterruptedException e) {
print("Exiting via interrupt");
}
print("Ending Wax Off task");
}
}
public class WaxOMatic {
public static void main(String[] args) throws Exception {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
TimeUnit.SECONDS.sleep(5); // Run for a while...
exec.shutdownNow(); // Interrupt all tasks
}
} /*
Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Exiting via interrupt
Ending Wax On task
Exiting via interrupt
Ending Wax Off task
*/

必须用一个检查感兴趣的条件的while循环包围wait(),其本质就是要检查所有感兴趣的特定条件,并在条件不满足的情况下返回到wait()中。

错失的信号

当两个线程使用notify()/wait()或notifyAll()/wait()进行协作时,有可能会错过某个信号。假设T1是通知T2的线程,而这两个线程都是使用下面(有缺陷的)方式实现的:

T1:
synchronized(sharedMonitor) {
<setup condition for T2>
sharedMonitor.notify();
}
T2:
while(someCondition){
synchronized(sharedMonitor) {
// Point1
sharedMonitor.wait();
}
}

是防止T2调用wait()的一个动作,当然前提是T2还没有调用wait()。
假设T2对someCondition求值并发现其为true。在Point1,线程调度器可能切换到了T1。而T1将执行其设置,然后调用notify()。当T2得以执行时,此时对T2来说,时机已经太晚了,以至于不能意识到这个条件已经发生了变化,因此会盲目进入wait()。此时notify()将错失,而T2也将无限地等待这个已经发送过的信号,从而产生死锁。
T2正确的执行方式:

synchronized(sharedMonitor) {
while(someCondition){
sharedMonitor.wait();
}
}

现在,如果T1首先执行,当控制返回T2时,他会发现条件发生了变化,从而不会进入wait()。反过来,如果T2先执行,它将进入wait(),并稍后会由T1唤醒。因此,信号不会错失。

notify()与notifyAll()

可能有多个任务在单个Car对象上处于wait()状态,因此调用notifyAll()比只调用notify()更安全。notify()并不是notifyAll的一种优化,notify()在众多等待同一个锁的任务中只有一个会被唤醒。
那是否意味着在程序中任何地方,任何处于wait()状态中的任务都将被任何对notifyAll的调用唤醒呢?事实上,当notifyAll()因某个特定锁而被调用时,只有等待这个锁的任务才会被唤醒:

class Blocker {
synchronized void waitingCall() {
try {
while (!Thread.interrupted()) {
wait();
System.out.print(Thread.currentThread() + " ");
}
} catch (InterruptedException e) {
// OK to exit this way
}
}
synchronized void prod() {
notify();
}
synchronized void prodAll() {
notifyAll();
}
}
class Task implements Runnable {
static Blocker blocker = new Blocker();
public void run() {
blocker.waitingCall();
}
}
class Task2 implements Runnable {
// A separate Blocker object:
static Blocker blocker = new Blocker();
public void run() {
blocker.waitingCall();
}
}
public class NotifyVsNotifyAll {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++)
exec.execute(new Task());
exec.execute(new Task2());
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
boolean prod = true;
public void run() {
if (prod) {
System.out.print("nnotify() ");
Task.blocker.prod();
prod = false;
} else {
System.out.print("nnotifyAll() ");
Task.blocker.prodAll();
prod = true;
}
}
}, 400, 400); // Run every .4 second
TimeUnit.SECONDS.sleep(5); // Run for a while...
timer.cancel();
System.out.println("nTimer canceled");
TimeUnit.MILLISECONDS.sleep(500);
System.out.print("Task2.blocker.prodAll() ");
Task2.blocker.prodAll();
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("nShutting down");
exec.shutdownNow(); // Interrupt all tasks
}
} /*
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-5,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-2,5,main]
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-5,5,main]
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-5,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-2,5,main]
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-5,5,main]
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-5,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-2,5,main]
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-5,5,main]
Timer canceled
Task2.blocker.prodAll() Thread[pool-1-thread-6,5,main]
Shutting down
*/

从输出上看,即使存在Task2.blocker上阻塞的Task2对象,也没有任何在Task.blocker上的notify()或notifyAll()调用会导致Task2对象被唤醒。与此类似,在main()的结尾,调用了timer的cancel(),即使计时器被撤销,前5个任务也仍然在运行,并仍旧在它们对Task.blocker.waitingCall()的调用中被阻塞。对Task2.blocker.prodAll()的调用所产生的输出不包括任何在Task.blocker()中的锁上等待的任务。

生产者与消费者

厨师代表生产者,而服务员代表消费者。两个任务必须在膳食被生产和消费时进行握手,而系统必须以有序的方式关闭:

class Meal {
private final int orderNum;
public Meal(int orderNum) {
this.orderNum = orderNum;
}
public String toString() {
return "Meal " + orderNum;
}
}
class WaitPerson implements Runnable {
private Restaurant restaurant;
public WaitPerson(Restaurant r) {
restaurant = r;
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (restaurant.meal == null)
wait(); // ... for the chef to produce a meal
}
print("Waitperson got " + restaurant.meal);
synchronized (restaurant.chef) {
restaurant.meal = null;
restaurant.chef.notifyAll(); // Ready for another
}
}
} catch (InterruptedException e) {
print("WaitPerson interrupted");
}
}
}
class Chef implements Runnable {
private Restaurant restaurant;
private int count = 0;
public Chef(Restaurant r) {
restaurant = r;
}
public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (restaurant.meal != null)
wait(); // ... for the meal to be taken
}
if (++count == 10) {
print("Out of food, closing");
restaurant.exec.shutdownNow();
}
printnb("Order up! ");
synchronized (restaurant.waitPerson) {
restaurant.meal = new Meal(count);
restaurant.waitPerson.notifyAll();
}
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (InterruptedException e) {
print("Chef interrupted");
}
}
}
public class Restaurant {
Meal meal;
ExecutorService exec = Executors.newCachedThreadPool();
WaitPerson waitPerson = new WaitPerson(this);
Chef chef = new Chef(this);
public Restaurant() {
exec.execute(chef);
exec.execute(waitPerson);
}
public static void main(String[] args) {
new Restaurant();
}
} /*
Order up! Waitperson got Meal 1
Order up! Waitperson got Meal 2
Order up! Waitperson got Meal 3
Order up! Waitperson got Meal 4
Order up! Waitperson got Meal 5
Order up! Waitperson got Meal 6
Order up! Waitperson got Meal 7
Order up! Waitperson got Meal 8
Order up! Waitperson got Meal 9
Out of food, closing
WaitPerson interrupted
Order up! Chef interrupted
*/

对于一个任务而言,只有一个单一的地点用于存放对象,从而使得另一个任务稍后可以使用这个对象。但是,在典型的生产者-消费者实现中,应使用先进先出队列来存储被生产和消费的对象。

使用显式的Lock和Condition对象

在Java SE5的java.util.concurrent类库中还有额外的显式工具可以用来重写WaxOMatic.java。使用互斥并允许任务挂起的基本类是Condition,可以通过在Condition上调用await()来挂起一个任务。当外部条件发生变化,意味着某个任务应该继续执行时,可以通过调用signal()来通知这个任务,从而唤醒一个任务,或者调用signalAll()来唤醒所有在这个Condition上被其自身挂起的任务。

class Car {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private boolean waxOn = false;
public void waxed() {
lock.lock();
try {
waxOn = true; // Ready to buff
condition.signalAll();
} finally {
lock.unlock();
}
}
public void buffed() {
lock.lock();
try {
waxOn = false; // Ready for another coat of wax
condition.signalAll();
} finally {
lock.unlock();
}
}
public void waitForWaxing() throws InterruptedException {
lock.lock();
try {
while (waxOn == false)
condition.await();
} finally {
lock.unlock();
}
}
public void waitForBuffing() throws InterruptedException {
lock.lock();
try {
while (waxOn == true)
condition.await();
} finally {
lock.unlock();
}
}
}
class WaxOn implements Runnable {
private Car car;
public WaxOn(Car c) {
car = c;
}
public void run() {
try {
while (!Thread.interrupted()) {
printnb("Wax On! ");
TimeUnit.MILLISECONDS.sleep(200);
car.waxed();
car.waitForBuffing();
}
} catch (InterruptedException e) {
print("Exiting via interrupt");
}
print("Ending Wax On task");
}
}
class WaxOff implements Runnable {
private Car car;
public WaxOff(Car c) {
car = c;
}
public void run() {
try {
while (!Thread.interrupted()) {
car.waitForWaxing();
printnb("Wax Off! ");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
}
} catch (InterruptedException e) {
print("Exiting via interrupt");
}
print("Ending Wax Off task");
}
}
public class WaxOMatic2 {
public static void main(String[] args) throws Exception {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOff(car));
exec.execute(new WaxOn(car));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
} /*
Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Exiting via interrupt
Ending Wax Off task
Exiting via interrupt
Ending Wax On task
*/

在Car的构造器中,单个的Lock将产生一个Condition独享,这个对象被用来管理任务间的通信。但是这个Condition对象不包含任何有关处理状态的信息,因此需要管理额外的表示处理状态的信息,即boolean waxOn。
每个队lock()的调用都必须紧跟一个try-finally子句,用来保证在所有情况下都可以释放锁。在使用内建版本时,任务在可以调用await()、signal()或signalAll()之前,必须拥有这个锁。

生产者-消费者与队列

使用同步队列来解决任务协作问题,同步队列在任何时刻都只允许一个任务插入或移除元素。在java.util.concurrent.BlockingQueue接口中提供了这个队列,这个接口有大量的标准实现。可以使用LinkedBlockingQueue,这是一个无届队列,还可以使用ArrayBlockingQueue,他具有固定的尺寸,因此可以在被阻塞之前,向其中放置有限数量的元素。
如果消费者任务试图从队列中获取对象,而该队列此时为空,那么这些队列还可以挂起消费者任务,并且当有更多的元素可用时恢复消费者任务。阻塞队列可以解决非常大量的问题,而其方式与wait()和notifyAll()相比,则简单并可靠得多。

class LiftOffRunner implements Runnable {
private BlockingQueue<LiftOff> rockets;
public LiftOffRunner(BlockingQueue<LiftOff> queue) {
rockets = queue;
}
public void add(LiftOff lo) {
try {
rockets.put(lo);
} catch (InterruptedException e) {
print("Interrupted during put()");
}
}
public void run() {
try {
while (!Thread.interrupted()) {
LiftOff rocket = rockets.take();
rocket.run(); // Use this thread
}
} catch (InterruptedException e) {
print("Waking from take()");
}
print("Exiting LiftOffRunner");
}
}
public class TestBlockingQueues {
static void getkey() {
try {
// Compensate for Windows/Linux difference in the
// length of the result produced by the Enter key:
new BufferedReader(
new InputStreamReader(System.in)).readLine();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
static void getkey(String message) {
print(message);
getkey();
}
static void
test(String msg, BlockingQueue<LiftOff> queue) {
print(msg);
LiftOffRunner runner = new LiftOffRunner(queue);
Thread t = new Thread(runner);
t.start();
for (int i = 0; i < 5; i++)
runner.add(new LiftOff(5));
getkey("Press 'Enter' (" + msg + ")");
t.interrupt();
print("Finished " + msg + " test");
}
public static void main(String[] args) {
test("LinkedBlockingQueue", // Unlimited size
new LinkedBlockingQueue<LiftOff>());
test("ArrayBlockingQueue", // Fixed size
new ArrayBlockingQueue<LiftOff>(3));
test("SynchronousQueue", // Size of 1
new SynchronousQueue<LiftOff>());
}
}

各个任务有main()放置到了BlockingQueue中,并且有LiftOffRunner从BlockingQueue中取出。注意,LiftOffRunner可以忽略同步问题,因为它们已经有BlockingQueue解决了。

吐司BlockingQueue

一台机器具有三个任务:制作吐司,给吐司抹黄油,在抹过黄油的吐司上涂果酱:

class Toast {
public enum Status {DRY, BUTTERED, JAMMED}
private Status status = Status.DRY;
private final int id;
public Toast(int idn) {
id = idn;
}
public void butter() {
status = Status.BUTTERED;
}
public void jam() {
status = Status.JAMMED;
}
public Status getStatus() {
return status;
}
public int getId() {
return id;
}
public String toString() {
return "Toast " + id + ": " + status;
}
}
class ToastQueue extends LinkedBlockingQueue<Toast> {
}
class Toaster implements Runnable {
private ToastQueue toastQueue;
private int count = 0;
private Random rand = new Random(47);
public Toaster(ToastQueue tq) {
toastQueue = tq;
}
public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(
100 + rand.nextInt(500));
// Make toast
Toast t = new Toast(count++);
print(t);
// Insert into queue
toastQueue.put(t);
}
} catch (InterruptedException e) {
print("Toaster interrupted");
}
print("Toaster off");
}
}
// Apply butter to toast:
class Butterer implements Runnable {
private ToastQueue dryQueue, butteredQueue;
public Butterer(ToastQueue dry, ToastQueue buttered) {
dryQueue = dry;
butteredQueue = buttered;
}
public void run() {
try {
while (!Thread.interrupted()) {
// Blocks until next piece of toast is available:
Toast t = dryQueue.take();
t.butter();
print(t);
butteredQueue.put(t);
}
} catch (InterruptedException e) {
print("Butterer interrupted");
}
print("Butterer off");
}
}
// Apply jam to buttered toast:
class Jammer implements Runnable {
private ToastQueue butteredQueue, finishedQueue;
public Jammer(ToastQueue buttered, ToastQueue finished) {
butteredQueue = buttered;
finishedQueue = finished;
}
public void run() {
try {
while (!Thread.interrupted()) {
// Blocks until next piece of toast is available:
Toast t = butteredQueue.take();
t.jam();
print(t);
finishedQueue.put(t);
}
} catch (InterruptedException e) {
print("Jammer interrupted");
}
print("Jammer off");
}
}
// Consume the toast:
class Eater implements Runnable {
private ToastQueue finishedQueue;
private int counter = 0;
public Eater(ToastQueue finished) {
finishedQueue = finished;
}
public void run() {
try {
while (!Thread.interrupted()) {
// Blocks until next piece of toast is available:
Toast t = finishedQueue.take();
// Verify that the toast is coming in order,
// and that all pieces are getting jammed:
if (t.getId() != counter++ ||
t.getStatus() != Toast.Status.JAMMED) {
print(">>>> Error: " + t);
System.exit(1);
} else
print("Chomp! " + t);
}
} catch (InterruptedException e) {
print("Eater interrupted");
}
print("Eater off");
}
}
public class ToastOMatic {
public static void main(String[] args) throws Exception {
ToastQueue dryQueue = new ToastQueue(),
butteredQueue = new ToastQueue(),
finishedQueue = new ToastQueue();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Toaster(dryQueue));
exec.execute(new Butterer(dryQueue, butteredQueue));
exec.execute(new Jammer(butteredQueue, finishedQueue));
exec.execute(new Eater(finishedQueue));
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow();
}
}

Toast是一个使用enum值的优秀实例。这个实例中没有任何显式的同步(即使用Lock对象或synchronized关键字),因为同步由队列(其内部是同步的)和系统的设计隐式地管理了——每片Toast在任何时刻都只由一个任务在操作。因为队列的阻塞,使得处理过程将被自动地挂起和恢复。可以看到由BlockingQueue产生的简化十分明显。在使用显式的wait()和notifyAll()时存在的类和类之间的耦合被消除了,因为每个类都只和它的BlockingQueue通信。

任务间使用管道进行输入/输出

通过输入/输出在线程间进行通信通常很有用。提供线程功能的类库以通道的形式对线程间的输入/输出提供了支持。它们在Java输入/输出类库中的对应物就是PipedWriter类(允许任务向通道写)和PipedReader类(允许不同任务从同一管道中读取)。

class Sender implements Runnable {
private Random rand = new Random(47);
private PipedWriter out = new PipedWriter();
public PipedWriter getPipedWriter() {
return out;
}
public void run() {
try {
while (true)
for (char c = 'A'; c <= 'z'; c++) {
out.write(c);
TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
}
} catch (IOException e) {
print(e + " Sender write exception");
} catch (InterruptedException e) {
print(e + " Sender sleep interrupted");
}
}
}
class Receiver implements Runnable {
private PipedReader in;
public Receiver(Sender sender) throws IOException {
in = new PipedReader(sender.getPipedWriter());
}
public void run() {
try {
while (true) {
// Blocks until characters are there:
printnb("Read: " + (char) in.read() + ", ");
}
} catch (IOException e) {
print(e + " Receiver read exception");
}
}
}
public class PipedIO {
public static void main(String[] args) throws Exception {
Sender sender = new Sender();
Receiver receiver = new Receiver(sender);
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(sender);
exec.execute(receiver);
TimeUnit.SECONDS.sleep(4);
exec.shutdownNow();
}
} /*
Read: A, Read: B, Read: C, Read: D, Read: E, Read: F, Read: G, Read: H, Read: I, Read: J, Read: K, Read: L, Read: M, java.lang.InterruptedException: sleep interrupted Sender sleep interrupted
java.io.InterruptedIOException Receiver read exception
*/

Sender把数据放进Writer,然后休眠一段时间(随机数)。然而,Receiver没有sleep()和wait()。但当它调用read()时,如果没有更多的数据,管道将自动阻塞。
如果启动一个没有构造完毕的对象,在不同的平台上管道可能会产生不一致的行为。
在shutdownNow()被调用时,可以看到PipedReader与普通IO之间的重要差异——PipedReader是可中断的,如果将in.read()调用修改为System.in.read(),那么interrupt()将不能打断read()调用。

最后

以上就是洁净绿茶为你收集整理的Java编程思想笔记——并发3的全部内容,希望文章能够帮你解决Java编程思想笔记——并发3所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(201)

评论列表共有 0 条评论

立即
投稿
返回
顶部