概述
文章目录
- Future、FutureTask和CompletableFuture
- Future
- FutureTask
- CompletableFuture
- CountDownLatch
- CyclicBarrier
- Semaphore
Future、FutureTask和CompletableFuture
Future
平时在使用多线程的时候, 是拿不到线程返回值的, 当需要拿到返回值时,需要实现Callable来获取
- Callable接口定义
public interface Callable<V> {
V call() throws Exception;
}
可以通过调用ExecutorService.submit()获取返回值:
<T> Future<T> submit(Callable<T> task);
- Future 持有线程, 可对线程做取消,获取等操作
public interface Future<V> {
// 取消线程,
// mayInterruptIfRunning: 是否中断正在执行的线程
boolean cancel(boolean mayInterruptIfRunning);
// 判断线程是否被取消
boolean isCancelled();
// 是否已执行结束(完成/取消/异常)返回true
boolean isDone();
// 获取执行结果, 会阻塞等待线程执行完成
V get() throws InterruptedException, ExecutionException;
// 获取执行结果, 阻塞一定时间后返回null
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
使用时, 先定义一个任务对象, 实现 Callable 接口:
public class Task implements Callable<String>{
@Override
public String call() throws Exception {
System.out.println("Task call");
return "Task";
}
}
通过 ExecutorService 来获取返回值
// 创建线程池
ExecutorService executor = Executors.newCachedThreadPool();
// 创建任务
Task task = new Task();
// 获取结果
Future<String> result = executor.submit(task);
// 关闭线程池
executor.shutdown();
FutureTask
Future 接口的具体实现, 常用方法如下:
public FutureTask(Callable<V> callable);
public FutureTask(Runnable runnable, V result);
使用示例:
// 创建任务
Task task = new Task();
FutureTask<String> futureTask = new FutureTask<String>(task);
// 构造一个新线程并注入FutureTask
Thread thread = new Thread(futureTask);
// 启动线程
thread.start();
// 获取返回值
if(futureTask.get()!=null) {
System.out.println("返回值:"+futureTask.get());
} else {
System.out.println("未获取到返回值");
}
CompletableFuture
JDK1.8 提供, Future的另外一种实现, 也是业务中使用比较多的方式;一个CompletableFuture 代表一个任务, 它解决了Future调用get()时阻塞以及需要不断轮询isDone()的问题.
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Demon-HY
* @date 2021/6/4
*/
public class CompletableFutureTest {
// 获取用户信息
public Map<String, Object> getUser() {
Map<String, Object> user = new HashMap<>();
user.put("name", "Demon-HY");
return user;
}
// 获取用户实名信息
public Map<String, Object> getUserReal() {
Map<String, Object> userReal = new HashMap<>();
userReal.put("realName", "何三岁");
userReal.put("idCard", "622628200011052364");
return userReal;
}
public static void main(String[] args) {
CompletableFutureTest completableFutureTest = new CompletableFutureTest();
// 创建一个定长的线程池
ExecutorService executor = Executors.newFixedThreadPool(2);
// 一个 CompletableFuture 就是一个任务
CompletableFuture<Map<String, Object>> userFuture =
CompletableFuture.supplyAsync(completableFutureTest::getUser, executor);
CompletableFuture<Map<String, Object>> userRealFuture =
CompletableFuture.supplyAsync(completableFutureTest::getUserReal, executor);
try {
// 执行两个任务
CompletableFuture<Void> completableFuture = CompletableFuture.allOf(userFuture, userRealFuture);
// 所有任务都已经执行完成
completableFuture.get();
Map<String, Object> user = userFuture.get();
Map<String, Object> userReal = userRealFuture.get();
System.out.println(user);
System.out.println(userReal);
// 关闭线程池
executor.shutdown();
} catch (InterruptedException | ExecutionException e) {
System.err.println("获取用户信息失败");
}
}
}
# 输出结果
{name=Demon-HY}
{realName=何三岁, idCard=622628200011052364}
CountDownLatch
通过计数器实现来控制线程的执行间隔, 一般计数器的初始值设置为线程的数量, 每当一个线程执行完时计数器减1,当计数器值为0时, 表示所有的线程都已执行完毕.
适用场景:
- 当主线程需要等待所有的子线程执行完毕
// 设置计数器的值为5
CountDownLatch countDownLatch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
final int index = i;
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "开始执行");
// 计数器减1
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
// 主线程阻塞, 当计数器为0时, 就唤醒主线程往下执行
countDownLatch.await();
System.out.println("所有子线程执行完成");
- 让多个线程等待,模拟并发场景,让线程一起执行
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
// 等待countDownLatch为0
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "开始执行");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
// 等待线程准备就绪
Thread.sleep(2000);
// 触发线程执行
countDownLatch.countDown();
CyclicBarrier
回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行,叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。
CountDownLatch 会阻塞主线程, CyclicBarrier 只会阻塞子线程.
CyclicBarrier 成员变量:
//同步操作锁
private final ReentrantLock lock = new ReentrantLock();
//线程拦截器
private final Condition trip = lock.newCondition();
//每次拦截的线程数
private final int parties;
//换代前执行的任务
private final Runnable barrierCommand;
//表示栅栏的当前代
private Generation generation = new Generation();
//计数器
private int count;
典型应用:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* @author Demon-HY
* @date 2021/6/4
*/
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
System.out.println("所有任务都执行完成");
});
Thread thread1 = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务1准备就绪");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("任务1执行完成");
});
Thread thread2 = new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("任务2准备就绪");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("任务2执行完成");
});
thread1.start();
thread2.start();
}
}
Semaphore
限制并发线程的数量, 同一时间内最多只允许N(许可数量,初始化时指定)个线程执行 acquire()和release()之间的代码。
Semaphore 核心方法:
package java.util.concurrent;
import java.util.Collection;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class Semaphore implements java.io.Serializable {
// 信号锁
private final Sync sync;
// 信号锁抽象类
abstract static class Sync extends AbstractQueuedSynchronizer {
// 表示同一时间内最多只允许 permits 个线程执行 acquire()和release()之间的代码
Sync(int permits) {
setState(permits);
}
}
// 非公平锁: 哪个线程先获得锁由CPU调度控制
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
}
// 公平锁: 所谓的公平信号量是获得锁的顺序与线程启动的顺序有关,但不代表100%获得信号量,仅仅是在概率上能保证
static final class FairSync extends Sync {
FairSync(int permits) {
super(permits);
}
}
// 默认实现为公平锁
// permits: 许可证数量, 表示同一时间内最多只允许 permits 个线程执行 acquire()和release()之间的代码
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 可以指定使用公平锁实现或者非公平锁实现
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
// 获取一个许可证, 阻塞等待
public void acquire() throws InterruptedException;
// 获取一个许可证, 等待进入acquire() 方法的线程不允许被中断
public void acquireUninterruptibly();
// 获取一个许可证,获取不到直接返回
public boolean tryAcquire();
// 获取一个许可证,重试一段时间后返回
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException;
// 释放许可证
public void release();
// 获取当前可以用的许可数
public int availablePermits();
// 获取并返回所有的许可个数,并且将可用的许可重置为0
public int drainPermits();
// 减少许可证的数量, 它不会影响到等待许可证的线程
protected void reducePermits(int reduction);
// 判断是否为公平锁
public boolean isFair();
// 判断有没有线程在等待许可证
public final boolean hasQueuedThreads();
// 获取等待许可证的线程个数
public final int getQueueLength() {
return sync.getQueueLength();
}
// 获取等待的线程
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
}
最后
以上就是闪闪大门为你收集整理的Java - 多线程并发包下常用类 Future CountDownLatch CyclicBarrier SemaphoreFuture、FutureTask和CompletableFutureCountDownLatchCyclicBarrierSemaphore的全部内容,希望文章能够帮你解决Java - 多线程并发包下常用类 Future CountDownLatch CyclicBarrier SemaphoreFuture、FutureTask和CompletableFutureCountDownLatchCyclicBarrierSemaphore所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复