概述
二、 ConcurrentHashMap----并发哈希映射
1.底层是基于数组+链表结构来存储数据
2.默认初始容量是16,默认加载因子是0.75,扩容的时倾默认每次增加一倍
hashmap线程安全但是非高并发
3.才用了分段(桶)锁机制。在后续的版本中,ConcurrentHashMap为了提高效率,在分段锁的基础上, 引入读写锁机制
a.读锁:允许多个线程读,不允许线程写
b.写锁:只允许一个线程写,不允许线程多
4.在JDK1.8中,引入了CAS(Compare And Swap,比较和交换)无锁算法保证线程安全性
5.从JDK1.8开始,如果一个桶中的元素个数超过了8个的时候,这个桶中的链表会扭转成一棵红黑树;如果不足7个,那么红黑树扭转回链表
6.红黑树:
a.红黑树本质上是一棵自平衡二叉查找树
b.二叉查找树的特点:
i.左子树都小于根
ii.右子树都大于根
c.红黑树的特点:
i.节点非红即黑
ii.根节点是黑节点
iii. 红节点的子节点一定是黑节点,但是黑节点的子节点不一定是
红节点
iv.最底层的叶子节点一定是黑色的空节点
v.从根节点到任意一个叶子节点所经过的路径中的黑色节点个数
一致的,即黑节点高度是一致
vi.新添的节点必须是红节点
d.红黑树的修正-前提一定是父子节点都为红–修正过程是一个链式的过程
i.叔父节点为红,那么将父节点以及叔父节点涂黑,将祖父节点
涂红
ii.叔父节点为黑,并且当前节点为右子叶,那么以当前节点为轴
进行左旋
iii.叔父节点为黑,并且当前节点为左子叶,那么以父节点为轴进
行右旋
修正案例:
e.红黑树的查询时间复杂度是o(logn)
三、ConcurrentNavigableMap-并发导航映射
1.提供了用于截取子映射的方法
2.一般使用的是实现类ConcurrentSkipListMap-并发跳跃表映射
3.跳跃表:
a.要求元素必须有序
b.针对原来的数据进行提取形成跳跃表,跳跃表可以向上一层层提取,但是最顶层的跳跃表中的元素至少有2个,一般是2-4个
c.跳跃表是典型的以空间换时间的产物
d.适用于查询多,增删少
e.如果新添节点,这个节点是否要提取到上一层跳跃表中,遵循"抛硬币"原则
f.跳跃表的查询时间复杂度是o(logn)
ConcurrentNavigableMap<String, Integer> map =
new ConcurrentskipListMap<>();
map.put( "a",3);map.put( "d",3);
map.put( "h",3);map.put( "w",3);
map.put( "j",3);map.put( "o",3);
map.put( "e",3);map.put( "s",3);
system.out.println(map );
system.out.printlh( map.subMap( "d","j"));//d到j
System.out.println(map.headMap( "o"));//
system.out.println(map.tailMap("h"));//h到尾
ExecutorService -执行器服务
一、概述
1.线程池的意义:减少服务器端的线程的创建和销毁,做到线程的重用
2.线程池在创建的时候需要定义一定数量的线程
3.每接受一个请求都会创建一个线程(核心线程)去处理这个请求
4.核心线程在处理完请求之后不会被销毁而是等待下一个请求过来
5.在核心线程达到数量之前,每次过来的请求都会去创建一个新的核心线程
6.如果所有的核心线程都被占用,那么后续的请求会被放入工作队列中。
7.如果工作队列也被全部占用,则线程池会创建临时线程去处理这个请求。如果临时线程执行完任务之后,存活指定的一段时间,如果这段时间内没有新的任务处理,则这个临时线程才会被kill掉。
8.如果临时线程也被全部占用,则新来的请求会交给
RejectedExecutionHandler -拒绝执行处理器处理
9.工作队列中的请求只会交给核心线程
package cn.tedu.threadpool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ExecutorServiceDemo {
public static void main(String[] args) {
//创建线程池
//创建线程池
// corePoolSize -核心线程数量
// maximumPoolSize -线程池大小=核心线程数+临时线程数
//keepAliveTime - 临时线程的存活时间
// unit - 时间单位
//workQueue -工作队列
//handler----拒绝执行处理器
ExecutorService es=
new ThreadPoolExecutor(
5, //5个核心线程
10,//5个核心线程+5个临时线程
5,TimeUnit.SECONDS, //临时线程用完之后存活5秒
new ArrayBlockingQueue<Runnable>(5),//工作队列
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("拒绝执行线程"+r);
}
});
//提交线程
for (int i = 0; i < 23; i++) {
es.execute(new EsRunnable());
}
//关闭线程池
es.shutdown();
}
}
class EsRunnable implements Runnable{
@Override
public void run() {
System.out.println("hello ~~~~");
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
打印结果
hello ~~~~
hello ~~~~
hello ~~~~
hello ~~~~
hello ~~~~
hello ~~~~
hello ~~~~
hello ~~~~
拒绝执行线程cn.tedu.threadpool.EsRunnable@232204a1
hello ~~~~
拒绝执行线程cn.tedu.threadpool.EsRunnable@4aa298b7
拒绝执行线程cn.tedu.threadpool.EsRunnable@7d4991ad
拒绝执行线程cn.tedu.threadpool.EsRunnable@28d93b30
hello ~~~~
拒绝执行线程cn.tedu.threadpool.EsRunnable@1b6d3586
拒绝执行线程cn.tedu.threadpool.EsRunnable@4554617c
拒绝执行线程cn.tedu.threadpool.EsRunnable@74a14482
拒绝执行线程cn.tedu.threadpool.EsRunnable@1540e19d
hello ~~~~
hello ~~~~
hello ~~~~
hello ~~~~
hello ~~~~
二、Callable
1.是JDK1.5提供的一个用于并发和返回结果的线程
2. Callable和Runnable的区别:
a.返回值: Runnable无返回值,Callable可以定义返回值
b.启动方式: Runnable可以通过Thread类或者是线程池来启动,但是Callable只能通过线程池启动
c.异常机制: Runnable不允许抛出异常,callable允许抛出异常,那么这就意味着callable如果报错可以利用全局方式处理
package cn.tedu.threadpool;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ExecutorServiceDemo2 {
public static void main(String[] args) throws Exception, ExecutionException {
//特点:
//1.没有核心线程,都是临时线程
//2.临时线程的数量是Integer.MAX_VALUE
//可以认为这个线程池的线程数量是无限的
//3.临时线程用完之后允许存活1min
//4.工作队列是一个同步队列
//大线程小队列
//使用场景:高并发、短任务场景(发消息)
// ExecutorService es=Executors.newCachedThreadPool();
//特点:
//1.没有临时线程,都是核心线程
//2.工作队列是一个阻塞式链式队列,
//容量默认是Integer.MAX_VALUE
//可以认为可以存储无限多的任务
//小池子大队列
//使用场景:长任务(百度云盘下载任务好多个,下载中的是核心线程工作,
//等待下载的则是存储在队列中)
ExecutorService es2=Executors.newFixedThreadPool(5);
//将结果封装成Future对象,泛型表示的是实际结果的类型
Future<String> f = es2.submit(new CThread());
es2.shutdown();
System.out.println(f.get());
}
}
//泛型表示返回值的类型
//定义泛型为Void是无返回值
class CThread implements Callable<String>{
@Override
public String call() throws Exception {
return "SUCESS";
}
}
三、scheduledExecutorService 定时执行器服务器
1.可以实现定时效果
2.是很多定时器的底层机制
package cn.tedu.threadpool;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class scheduledExecutorServiceDemo {
public static void main(String[] args) {
//创建定时线程池
ScheduledExecutorService ses=
Executors.newScheduledThreadPool(5);
//定时执行
// callable 要执行的线程
// delay 推迟的时间
//unit 时间单位
//在线程池启动之后,推迟5s执行这个线程
// ses.schedule(new ScheduleRunnable(), 5, TimeUnit.SECONDS);
//
//command ---
//initialDelay
//period
//unit
//每隔5s执行一次
//如果线程执行时间小于间隔时间,从上一次执行的起始开始计时 ---5s
//如果线程执行时间大于间隔时间,则按线程时间来
// ses.scheduleAtFixedRate(new ScheduleRunnable(), 0, 5, TimeUnit.SECONDS);
//
//从上一次执行的结束开始计时-----8s
ses.scheduleWithFixedDelay(new ScheduleRunnable(), 0, 5, TimeUnit.SECONDS);
}
}
class ScheduleRunnable implements Runnable{
@Override
public void run() {
System.out.println("hello ~~~~");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
四、ForkJoinPool -分叉含并池
1.分叉:将一个大任务拆分成多个小任务然后分布在不同的核上执行
合并:将分叉出去的线程的执行结果进行汇总
2.分叉合并能够有效的提高CPU的利用率
3.数据量越大,分叉合并相对循环而言的效率就越高
5.为了防止满任务导致整体效率降低,分叉合并采取了work-stealing (工
作窃取)策略-当一个核上的任务执行完成之后,这个核不会空闲而是去随机扫描一个核然后从这个核的任务列表的尾端来“偷”一个任务回来执行
package cn.tedu.threadpool;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class ForkJoinDemo {
public static void main(String[] args) throws Exception, Exception {
long begin=System.currentTimeMillis();
//求1-100000000000L的和
long sum=0;
for (int i = 0; i < 100000000000L; i++) {
sum+=i;
}
System.out.println(sum);
// ForkJoinPool pool=new ForkJoinPool();
// Future<Long> f = pool.submit(new Sum(1,1000000000L));
// pool.shutdown();
// System.out.println(f.get());
long end=System.currentTimeMillis();
System.out.println("耗时"+(end-begin));
}
}
class Sum extends RecursiveTask<Long>{
private long start;
private long end;
public Sum(long start, long end) {
super();
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if(end-start<=10000){
long sum=0;
for (long i = start; i <=end; i++) {
sum+=i;
}
return sum;
}else{
long mid=(start+end)/2;
Sum left=new Sum(start,mid);
Sum right=new Sum(mid+1,end);
//分叉
left.fork();
right.fork();
//合并
return left.join()+right.join();
}
}
}
Lock-锁
一、概述
1.synchronized在使用的时候,需要关注锁对象。它的使用不够灵活
2. Lock是锁的顶级接口,所以使用的是实现类ReentrantLock
3.ReadWriteLock:读写锁。
3.ReadWriteLock:读写锁
a.读锁:允许多个线程读,但是不允许线程写
b.写锁:只允许一个线程写,但是不允许线程读
4.公平和非公平策略
a.非公平策略:在资源有限的情况下,有可能出现线程之间抢占的次数不均等的情况
b.公平策略:在资源有限的情况下,保证每一个线程执行的次数是基本均等的
c.非公平策略的效率更高
d. synchronized是非公平策略
e. Lock默认也是非公平的
package cn.tedu.lock;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockDemo {
static int i=0;
public static void main(String[] args) throws Exception {
Lock lock=new ReentrantLock();
new Thread(new Add(lock)).start();
new Thread(new Add(lock)).start();
Thread.sleep(5000);
System.out.println(i);
}
}
class Add implements Runnable{
private Lock lock;
public Add(Lock lock) {
this.lock=lock;
}
@Override
public void run() {
//加锁
lock.lock();
// synchronized (Add.class) {
for (int i = 0; i < 10000; i++) {
LockDemo.i++;
}
// }
//解锁
lock.unlock();
}
}
二、其他锁
1.CountDownLatch:闭锁/线程递减锁。对线程进行计数,当计数归零的时候放开阻塞
shift +alt+ a 列模式
package cn.tedu.lock;
import java.util.concurrent.CountDownLatch;
class CountDownLatchDemo {
public static void main(String[] args) throws Exception {
CountDownLatch cdl=new CountDownLatch(5);
new Thread(new Teacher(cdl)).start();
new Thread(new Student(cdl)).start();
new Thread(new Student(cdl)).start();
new Thread(new Student(cdl)).start();
new Thread(new Student(cdl)).start();
//在计数归零之前陷入阻塞
cdl.await();
System.out.println("考试开始喽^^^");
}
}
class Teacher implements Runnable{
private CountDownLatch cdl;
public Teacher(CountDownLatch cdl) {
this.cdl=cdl;
}
@Override
public void run() {
System.out.println("监考老师到达考场~~~~~");
//减少一个计数
cdl.countDown();
}
}
class Student implements Runnable{
private CountDownLatch cdl;
public Student(CountDownLatch cdl) {
this.cdl=cdl;
}
@Override
public void run() {
System.out.println("考生到达考场~~~");
//减少一个计数
cdl.countDown();
}
}
2.CyclicBarrier:栅栏。对线程进行计数,当计数归零的时候放开阻塞
package cn.tedu.lock;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cb=new CyclicBarrier(4);
new Thread(new Runner(cb),"1").start();
new Thread(new Runner(cb),"2").start();
new Thread(new Runner(cb),"3").start();
new Thread(new Runner(cb),"4").start();
}
}
class Runner implements Runnable{
private CyclicBarrier cb;
public Runner(CyclicBarrier cb) {
this.cb=cb;
}
@Override
public void run() {
String name=Thread.currentThread().getName();
try {
Thread.sleep((long)(Math.random()*3000));
System.out.println(name+"号运动员到了起跑线~~~~");
//让当前线程陷入阻塞,同时减少一个计数
//当计数归零的时候,刚开阻塞
cb.await();
System.out.println(name+"号运动员跑出去了~~~~");
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.Exchanger:交换机。用于交换两个线程之间的信息
package cn.tedu.lock;
import java.util.concurrent.Exchanger;
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> ex=new Exchanger<>();
new Thread(new Producer(ex)).start();
new Thread(new Consumer(ex)).start();
}
}
class Producer implements Runnable{
private Exchanger<String> ex;
public Producer(Exchanger<String> ex) {
super();
this.ex = ex;
}
@Override
public void run() {
String info="商品";
//生产者将商品交换给消费者
try {
String msg = ex.exchange(info);
System.out.println("生产者收到消费者交换的"+msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable{
private Exchanger<String> ex;
public Consumer(Exchanger<String> ex) {
super();
this.ex = ex;
}
@Override
public void run() {
String info="钱";
//消费者把钱交换给生产者
String msg;
try {
msg = ex.exchange(info);
System.out.println("消费者收到消费者交换的"+msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4.Semaphore:信号量。每一个线程可以取得一个信号量,当信号量被全
部占用之后,后来的线程就会被阻塞,直到有信号量被释放,那么阻塞的线程才能获取信号量俩继续执行。实际开发中,使用信号量来限流。【场景:直播间的人流量】
package cn.tedu.lock;
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore s=new Semaphore(5);
for (int i = 0; i < 8; i++) {
new Thread(new Table(s)).start();
}
}
}
class Table implements Runnable{
private Semaphore s;
public Table(Semaphore s) {
super();
this.s = s;
}
@Override
public void run() {
try {
//占一张桌子,信号量减一个
//当信号量为0.没桌子,后来的线程发生阻塞
s.acquire();
System.out.println("过来一群人占用了一张桌子~~~~");
Thread.sleep((long)Math.random()*10000);
//空出一张桌子,信号量加一
System.out.println("一拨人离开,空出一张桌子");
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
二、原子性操作
1.这个属性在计算过程中,不会被其他线程抢占,保证属性的原子性
2.实际开发中,可以使用习惯的锁机制来代替
package cn.tedu.atomic;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicIntegerDemo {
//原子性整数
//对单个属性的锁机制
static AtomicInteger ai=new AtomicInteger(0);
public static void main(String[] args) throws Exception {
CountDownLatch cdl=new CountDownLatch(2);
new Thread(new Add(cdl)).start();
new Thread(new Add(cdl)).start();
cdl.await();
System.out.println(ai);
}
}
class Add implements Runnable{
private CountDownLatch cdl;
public Add(CountDownLatch cdl) {
super();
this.cdl = cdl;
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
AtomicIntegerDemo.ai.incrementAndGet();
}
cdl.countDown();
}
}
最后
以上就是风中老虎为你收集整理的高并发day02(Concurrent包)的全部内容,希望文章能够帮你解决高并发day02(Concurrent包)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复