概述
一:Java8的函数式编程
1. 特点
- 将函数作为参数传递给另外一个函数
- 将函数作为另外一个函数的返回值
- 避免修改函数的外部状态
- 声明式编程,不需要提供明确的指令操作
- 传递的对象不变,易于并行
2. 基本概念
函数式接口
只定义单一抽象方法(被Object实现的方法不是抽象方法)的接口,可以存在实例方法
@FunctionalInterface
public interface Handler {
void handle();
}
接口默认方法
接口可以包含多个默认实例方法,这样类可以通过多重实现接口,实现多重继承
@FunctionalInterface
public interface Handler {
void handle();
default void myfunction(){
System.out.println("Hello");
}
}
如果实现的多个接口具有同名的实例方法,则需要方法绑定
package function;
@FunctionalInterface
interface Handler1 {
void handle1();
default void myfunction(){
System.out.println("Hello1");
}
}
@FunctionalInterface
interface Handler2 {
void handle2();
default void myfunction(){
System.out.println("Hello2");
}
}
public class FunctionTest implements Handler1,Handler2 {
@Override
public void handle1() {
System.out.println("类实现的handle1");
}
@Override
public void handle2() {
System.out.println("类实现的handle2");
}
//方法绑定
@Override
public void myfunction() {
Handler2.super.myfunction();
}
public static void main(String[] args) {
FunctionTest test = new FunctionTest();
test.handle1();
test.handle2();
test.myfunction();
}
}
lambda表达式
即匿名函数,可以作为参数直接传递给调用者,可以访问外部的常量
结构:(参数)-> 函数体
方法引用
通过类名和方法名来定位一个静态方法或实例方法,系统自动判断流中的元素(实例或类)是调用方法的调用目标还是调用方法传人的调用参数
- 引用静态方法 ClassName::staticMethodName
- 引用某个对象的实例方法 Object::instanceMethodName
- 引用某个类型的任意对象的实例方法 ClassName::methodName
- 引用超类的实例方法 super::methodName
- 引用构造方法 ClassName::new
- 引用数组构造方法:TypeName[]::new
如果一个类存在同名的实例方法和静态方法,则会出现编译错误
流对象
类似于集合或数组,是一个对象的集合。通过流对象可以方便的处理流内的元素
List list = new ArrayList();
list.stream();
int[] arr = new int[10];
Arrays.stream(arr);
3. 并行流
(1)获取流
list.stream().parallel(); / list.parallelStream();
Arrays.stream(arr).parallel();
(2)数组并行
//并行排序,并且可以指定范围
Arrays.parallelSort();
//对数组的每一个值进行计算并更新
Arrays.parallelSetAll();
//将数组中每个元素替换为指定关联操作前缀的积累
Arrays.parallelPrefix();
4. CompletableFuture
(1)可等待
如果CompletableFuture没有需要的数据,则线程会进入等待
package function;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureTest {
private static class AskThread extends Thread{
CompletableFuture<Integer> future;
public AskThread(CompletableFuture<Integer> future) {
this.future = future;
}
//如果数据没有准备好,则会阻塞
@Override
public void run() {
try {
System.out.println("get data:"+future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
CompletableFuture<Integer> future = new CompletableFuture<>();
AskThread thread = new AskThread(future);
thread.start();
Thread.sleep(3000);
future.complete(5);
}
}
(2)异步调用
在异步调用后可以立即返回CompletableFuture实例使用,如果需要立即获得CompletableFuture的数据,而数据还没有准备好,才需要等待
- supplyAsync(线程池):返回值
- runAsync(线程池):没有返回值
默认在ForkJoinPool.commonPool中,所有线程都是守护线程
package function;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class CompletableFutureTest {
private static int async(int data){
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return data;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(()->async(10));
System.out.println("立即返回:"+future);
System.out.println(future.get());
}
}
(3)流式调用
CompletableFuture<Void> future = CompletableFuture
.supplyAsync(()->async(10))
.thenApply((i) -> i++)
.thenApply((str) -> """+str+""")
.thenAccept(System.out::println);
System.out.println("立即返回:"+future);
System.out.println(future.get());
(4)捕获异常
CompletableFuture<Void> future = CompletableFuture
.supplyAsync(()->async(10))
.exceptionally(ex-> {
System.out.println(ex.toString());
return 0;
})
.thenApply((i) -> i++)
.thenApply((str) -> """+str+""")
.thenAccept(System.out::println);
(5)组合
thenCompose
将执行结果传递给下一个CompletableFuture
CompletableFuture<Void> future = CompletableFuture
.supplyAsync(()->async(10))
.exceptionally(ex-> {
System.out.println(ex.toString());
return 0;
})
.thenCompose((i)->CompletableFuture.supplyAsync(()->async(i)))
.thenApply((str) -> """+str+""")
.thenAccept(System.out::println);
thenCombine
对两个CompletableFuture的执行结果进行操作
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(()->async(10));
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(()->async(20));
CompletableFuture<Void> result = future1.thenCombine(future2,(i,j)->i+j)
.thenApply((str) -> """+str+""")
.thenAccept(System.out::println);
result.get();
(6)指定时间
指定future在一定时间内执行,超时则抛出异常
CompletableFuture<Void> future = CompletableFuture
.supplyAsync(()->async(10))
.orTimeout(1, TimeUnit.SECONDS)
.exceptionally(ex-> {
System.out.println(ex.toString());
return 0;
})
//
.thenCompose((i)->CompletableFuture.supplyAsync(()->async(i)))
.thenApply((str) -> """+str+""")
.thenAccept(System.out::println);
System.out.println("立即返回:"+future);
System.out.println(future.get());
5. 改进读写锁——StampedLock
乐观的读策略,读锁不会阻塞写锁
(1)使用
import java.util.concurrent.locks.StampedLock;
public class StampedLockTest {
//数据
private int data;
//锁
private StampedLock lock;
//写锁,排他锁
public void write(int data){
long stamp = lock.writeLock();
try {
this.data = data;
}
finally {
lock.unlockWrite(stamp);
}
}
//读锁,共享锁
public int read(){
//尝试乐观读
long stamp = lock.tryOptimisticRead();
int current = data;
//判断在读的时候有没有被写
if (!lock.validate(stamp)){
//如果被修改,可以在死循环中不断读,直到成功
//或者升级锁的级别,变成悲观锁,如果当前数据被修改则挂起
stamp = lock.readLock();
try {
current = data;
}finally {
lock.unlockRead(stamp);
}
}
return current;
}
}
(2)注意
StampedLock使用死循环反复尝试的策略
- 挂起线程时,使用Unsafe.park(),遇到中断直接中断
- 死循环没有处理中断,导致不断循环
public long readLock() {
long s = state, next;
// bypass acquireRead on common uncontended case
return ((whead == wtail && (s & ABITS) < RFULL &&
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}
(3)实现原理
基于CLH锁(自旋锁),保证没有饥饿发生和先来先服务顺序
锁维护了一个等待线程队列,记录申请锁失败的线程。每一个结点代表一个线程,保存一个标记位,判断线程是否释放锁
- 线程申请锁,把队列的尾部节点作为其前序节点,如果前序节点没有释放锁,则自旋等待;反之可以执行
- 线程释放锁,将自身的标记位置为false
static final class WNode {
volatile WNode prev;
volatile WNode next;
volatile WNode cowait;
// list of linked readers
volatile Thread thread;
// non-null while possibly parked
volatile int status;
// 0, WAITING, or CANCELLED
final int mode;
// RMODE or WMODE
WNode(int m, WNode p) { mode = m; prev = p; }
}
/** Head of CLH queue */
private transient volatile WNode whead;
/** Tail (last) of CLH queue */
private transient volatile WNode wtail;
写锁:
public long writeLock() {
long s, next;
// bypass acquireWrite in fully unlocked case only
//设置state的写锁位为1
return ((((s = state) & ABITS) == 0L &&
U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
next : acquireWrite(false, 0L));
}
乐观读:
public long tryOptimisticRead() {
long s;
//判断state的写锁位=0
return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
校准:
public boolean validate(long stamp) {
U.loadFence();
return (stamp & SBITS) == (state & SBITS);
}
悲观锁:
public long readLock() {
long s = state, next;
// bypass acquireRead on common uncontended case
//尝试state加1
return ((whead == wtail && (s & ABITS) < RFULL &&
U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
next : acquireRead(false, 0L));
}
- 如果悲观锁失败,则进入自旋
- 多次自旋失败后启用CLH队列,再自旋
- 自旋失败后,通过Unsafe.park()方法挂起线程
- 获取锁成功后,激活自己的所有读线程
6. 增强原子类
(1)LongAdder
在一般的原子类中,修改数据时会陷入死循环,直到修改成功,如果有大量线程竞争,则修改失败的几率高
通过热点分离,把数据分解成多个单元,线程访问某个单元并修改单元值,单元值的合成就是该数据的值
- 把所有数据记录在一个变量中,如果对变量的修改没有冲突,则成功
- 如果发生冲突,则把数据分解成多个单元,再修改
- 如果修改单元发生冲突,则创建新的单元,或者单元数量加倍
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
//先尝试修改变量base
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)) //再尝试修改单元值
longAccumulate(x, null, uncontended); //重新修改
}
}
使用:
adder.increment(); //增加
adder.sum(); //单元总值
(2)LongAccumulator
LongAdder只能实现整数的加法,而LongAccumulator可以实现任意函数
使用:
`LongAccumulator accumulator = new LongAccumulator(二元函数(接收两个long型参数),初始值);`
accumulator.accumulate(参数); //传入参数到二元函数,返回一个long值
7. 增强ConcurrentHashMap
- foreach:接口是一个Consumer或BigConsumer,用于对Map的数据进行消费
- reduce:foreach操作的function版本
- 条件插入computeIfAbsent:判断是否不存在,是则添加,保证线程安全
- search:查找第一个使得function返回不为null的值
- mappingCount:返回Map的条目总数(结果不一定准确)
- newKeySet:实现安全的HashSet
8. 发布和订阅模式
(1)反应式编程
用于处理异步流中的数据,每当应用收到数据项,便进行处理
- Publisher:发布者,将数据发布到流中
- onCompleted(): 没有数据
- onError(): 发生异常
- onSubscribe():订阅者注册后被调用的第一个方法
- onNext():下一个数据准备好
- Subscriber:接收者,从流中接收数据并处理
- request():请求数据个数
- cancel():停止接收新消息
import java.util.Arrays;
import java.util.concurrent.Flow.*;
import java.util.concurrent.SubmissionPublisher;
public class FlowDemo
{
public static void main(String[] args)
{
// 创建数据发布者
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
// 注册订阅者
MySubscriber<String> subscriber = new MySubscriber<>();
publisher.subscribe(subscriber);
//发布数据,发布完成关闭发布者
System.out.println("Publishing data items...");
String[] items = { "jan", "feb", "mar", "apr", "may", "jun",
"jul", "aug", "sep", "oct", "nov", "dec" };
Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
publisher.close();
//等待订阅者处理
try
{
synchronized("A")
{
"A".wait();
}
}
catch (InterruptedException ie)
{
}
}
}
class MySubscriber<T> implements Subscriber<T>
{
private Subscription subscription;
//注册时调用,开始请求数据
@Override
public void onSubscribe(Subscription subscription)
{
this.subscription = subscription;
subscription.request(1);
}
//请求数据,继续请求下一个
@Override
public void onNext(T item)
{
System.out.println("Received: " + item);
subscription.request(1);
}
@Override
public void onError(Throwable t)
{
t.printStackTrace();
synchronized("A")
{
"A".notifyAll();
}
}
@Override
public void onComplete()
{
System.out.println("Done");
synchronized("A")
{
"A".notifyAll();
}
}
}
(2)数据处理链
最后
以上就是天真万宝路为你收集整理的并发编程学习笔记(五)——Java8、9、10的全部内容,希望文章能够帮你解决并发编程学习笔记(五)——Java8、9、10所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复