我是靠谱客的博主 美满钢笔,最近开发中收集的这篇文章主要介绍高并发下缓存失效问题-缓存穿透、雪崩、击穿(使用本地锁和分布式锁Redisson解决)高并发下缓存失效问题-缓存穿透、雪崩、击穿(本地锁和分布式锁Redisson),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

高并发下缓存失效问题-缓存穿透、雪崩、击穿(本地锁和分布式锁Redisson)

缓存穿透

  • 产生原因

    指查询一个一定不存在的数据,由于缓存是不命中,将去查询数据库,但是 数据库也无此记录,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义

  • 风险

    利用不存在的数据进行攻击,数据库瞬时压力增大,最终导致崩溃

  • 解决

    null结果缓存,并加入短暂过期时间

    public Map<String, List<Category2Vo>> getCatalogJson() {
    // 加入缓存逻辑
    String catalogJson = redisTemplate.opsForValue().get("catalogJson");
    if (StringUtils.isEmpty(catalogJson)) {
    // 查询数据库
    Map<String, List<Category2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
    // 将数据转换为Json加入到缓存中
    String jsonString = JSON.toJSONString(catalogJsonFromDb);
    // 解决缓存穿透问题
    if (catalogJsonFromDb == null) {
    redisTemplate.opsForValue().set("catalogJson", "0");
    } else {
    redisTemplate.opsForValue().set("catalogJson", jsonString);
    }
    return catalogJsonFromDb;
    }
    Map<String, List<Category2Vo>> listMap = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Category2Vo>>>() {});
    return listMap;
    }
    

缓存雪崩

  • 产生原因

    缓存雪崩是指在我们设置缓存时key采用了相同的过期时间, 导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。

  • 解决

    原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体 失效的事件。

    public Map<String, List<Category2Vo>> getCatalogJson() {
    // 加入缓存逻辑
    String catalogJson = redisTemplate.opsForValue().get("catalogJson");
    if (StringUtils.isEmpty(catalogJson)) {
    // 查询数据库
    Map<String, List<Category2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
    // 将数据转换为Json加入到缓存中
    String jsonString = JSON.toJSONString(catalogJsonFromDb);
    // 解决缓存穿透问题
    if (catalogJsonFromDb == null) {
    redisTemplate.opsForValue().set("catalogJson", "0");
    } else {
    // 解决缓存雪崩问题
    redisTemplate.opsForValue().set("catalogJson", jsonString, new Random().nextInt(5) + 1, TimeUnit.DAYS);
    }
    return catalogJsonFromDb;
    }
    Map<String, List<Category2Vo>> listMap = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Category2Vo>>>() {});
    return listMap;
    }
    

缓存击穿

  • 产生原因

    1. 对于一些设置了过期时间的key,如果这些key可能会在某些 时间点被超高并发地访问,是一种非常“热点”的数据。
    2. 如果这个key在大量请求同时进来前正好失效,那么所有对 这个key的数据查询都落到db,我们称为缓存击穿。
  • 解决:加锁

    大量并发只让一个去查,其他人等待,查到以后释放锁,其他 人获取到锁,先查缓存,就会有数据,不用去db

本地锁

synchronized、JUC(Lock)

@Override
public Map<String, List<Category2Vo>> getCatalogJson() {
// 加入缓存逻辑
String catalogJson = redisTemplate.opsForValue().get("catalogJson");
if (StringUtils.isEmpty(catalogJson)) {
// 查询数据库
Map<String, List<Category2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
return catalogJsonFromDb;
}
Map<String, List<Category2Vo>> listMap = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Category2Vo>>>() {});
return listMap;
}
/**
* 从数据库中查询并封装分类数据
*
* @return
*/
public Map<String, List<Category2Vo>> getCatalogJsonFromDb() {
// 加锁:解决缓存击穿
// 只要是同一把锁,就能锁住需要这个锁的所有线程
// synchronized (this)
SpringBoot所有的组件在容器中都是单例的
synchronized (this) {
// 得到锁之后,我们需要再去缓存中确定一次,如果没有才需要继续查询
String catalogJson = redisTemplate.opsForValue().get("catalogJson");
if (!StringUtils.isEmpty(catalogJson)) {
// 缓存不为空直接返回
Map<String, List<Category2Vo>> listMap = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Category2Vo>>>() {});
return listMap;
}
List<CategoryEntity> selectList = baseMapper.selectList(null);
System.out.println("查询了数据库...");
// 获取全部一级分类
List<CategoryEntity> category1List = getCategoryByParentCid(selectList, 0L);
// 封装数据
Map<String, List<Category2Vo>> listMap = category1List.stream().collect(Collectors.toMap(key -> key.getCatId().toString(), v -> {
// 查询二级分类
List<CategoryEntity> category2List = getCategoryByParentCid(selectList, v.getCatId());
List<Category2Vo> category2VoList = category2List.stream().map(item -> {
Category2Vo category2Vo = new Category2Vo(item.getParentCid().toString(), item.getCatId().toString(), item.getName(), null);
// 查询三级分类
List<CategoryEntity> category3List = getCategoryByParentCid(selectList, item.getCatId());
List<Category2Vo.Category3Vo> category3VoList = category3List.stream().map(item2 -> {
return new Category2Vo.Category3Vo(item2.getParentCid().toString(), item2.getCatId().toString(), item2.getName());
}).collect(Collectors.toList());
category2Vo.setCatalog3List(category3VoList);
return category2Vo;
}).collect(Collectors.toList());
return category2VoList;
}));
// 需要在锁中进行缓存,否则会再释放锁的同时,有其他请求进来,导致查询了多次数据库,出现没有锁住的问题
// 将数据转换为Json加入到缓存中
String jsonString = JSON.toJSONString(listMap);
// 解决缓存穿透问题
if (listMap == null) {
redisTemplate.opsForValue().set("catalogJson", "0");
} else {
// 结局缓存雪崩问题
redisTemplate.opsForValue().set("catalogJson", jsonString, new Random().nextInt(5) + 1, TimeUnit.DAYS);
}
return listMap;
}
}
/**
* 根据父类ID查询出全部子分类
*
* @param selectList
* @param parentCid
* @return
*/
public List<CategoryEntity> getCategoryByParentCid(List<CategoryEntity> selectList, Long parentCid) {
List<CategoryEntity> entityList = selectList.stream().
filter(item -> item.getParentCid() == parentCid).
collect(Collectors.toList());
return entityList;
}

分布式锁

分布式锁演进

  • 业务代码

    private Map<String, List<Category2Vo>> getDataFromDb() {
    // 得到锁之后,我们需要再去缓存中确定一次,如果没有才需要继续查询
    String catalogJson = redisTemplate.opsForValue().get("catalogJson");
    if (!StringUtils.isEmpty(catalogJson)) {
    // 缓存不为空直接返回
    Map<String, List<Category2Vo>> listMap = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Category2Vo>>>() {
    });
    return listMap;
    }
    List<CategoryEntity> selectList = baseMapper.selectList(null);
    System.out.println("查询了数据库...");
    // 获取全部一级分类
    List<CategoryEntity> category1List = getCategoryByParentCid(selectList, 0L);
    // 封装数据
    Map<String, List<Category2Vo>> listMap = category1List.stream().collect(Collectors.toMap(key -> key.getCatId().toString(), v -> {
    // 查询二级分类
    List<CategoryEntity> category2List = getCategoryByParentCid(selectList, v.getCatId());
    List<Category2Vo> category2VoList = category2List.stream().map(item -> {
    Category2Vo category2Vo = new Category2Vo(item.getParentCid().toString(), item.getCatId().toString(), item.getName(), null);
    // 查询三级分类
    List<CategoryEntity> category3List = getCategoryByParentCid(selectList, item.getCatId());
    List<Category2Vo.Category3Vo> category3VoList = category3List.stream().map(item2 -> {
    return new Category2Vo.Category3Vo(item2.getParentCid().toString(), item2.getCatId().toString(), item2.getName());
    }).collect(Collectors.toList());
    category2Vo.setCatalog3List(category3VoList);
    return category2Vo;
    }).collect(Collectors.toList());
    return category2VoList;
    }));
    // 需要在锁中进行缓存,否则会再释放锁的同时,有其他请求进来,导致查询了多次数据库,出现没有锁住的问题
    // 将数据转换为Json加入到缓存中
    String jsonString = JSON.toJSONString(listMap);
    // 解决缓存穿透问题
    if (listMap == null) {
    redisTemplate.opsForValue().set("catalogJson", "0");
    } else {
    // 结局缓存雪崩问题
    redisTemplate.opsForValue().set("catalogJson", jsonString, new Random().nextInt(5) + 1, TimeUnit.DAYS);
    }
    return listMap;
    }
    
  • 阶段一

    public Map<String, List<Category2Vo>> getCatalogJsonFromDbWithRedisLock() {
    // 占分布式锁,去redis占坑 命令:set lock 111 NX
    Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
    // 加锁成功...执行业务
    if (lock) {
    // 执行业务代码
    Map<String, List<Category2Vo>> dataFromDb = getDataFromDb();
    redisTemplate.delete("lock");// 删除锁
    return dataFromDb;
    } else {
    // 加锁失败...重试
    // 休眠100ms重试
    try {
    Thread.sleep(100);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return getCatalogJsonFromDbWithRedisLock();// 自旋的方式
    }
    }
    
    • 问题

      setnx占好了位,业务代码异常或者程序在页面过程中宕机。没有执行删除锁逻辑,这就造成了死锁

    • 解决

      设置锁的自动过期,即使没有删除,会自动删除

  • 阶段二

    public Map<String, List<Category2Vo>> getCatalogJsonFromDbWithRedisLock() {
    // 占分布式锁,去redis占坑
    Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
    // 加锁成功...执行业务
    if (lock) {
    // 设置过期时间
    redisTemplate.expire("lock", 30, TimeUnit.SECONDS);
    // 执行业务代码
    Map<String, List<Category2Vo>> dataFromDb = getDataFromDb();
    redisTemplate.delete("lock");// 删除锁
    return dataFromDb;
    } else {
    // 加锁失败...重试
    // 休眠100ms重试
    try {
    Thread.sleep(100);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return getCatalogJsonFromDbWithRedisLock();// 自旋的方式
    }
    }
    
    • 问题

      setnx设置好,正要去设置过期时间,宕机。又死锁了。

    • 解决

      设置过期时间和占位必须是原子的。redis支持使用setnx ex 命令

  • 阶段三

    public Map<String, List<Category2Vo>> getCatalogJsonFromDbWithRedisLock() {
    // 占分布式锁,去redis占坑 设置过期时间 必须和加锁是同步的,原子的
    Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111", 300, TimeUnit.SECONDS);
    // 加锁成功...执行业务
    if (lock) {
    // 执行业务代码
    Map<String, List<Category2Vo>> dataFromDb = getDataFromDb();
    redisTemplate.delete("lock");// 删除锁
    return dataFromDb;
    } else {
    // 加锁失败...重试
    // 休眠100ms重试
    try {
    Thread.sleep(100);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return getCatalogJsonFromDbWithRedisLock();// 自旋的方式
    }
    }
    
    • 问题

      删除锁直接删除??? 如果由于业务时间很长,锁自己过期了,我们直接删除,有可能把别人正在持有的锁删除了。

    • 解决

      占锁的时候,值指定为uuid,每个人匹配是自己 的锁才删除。

  • 阶段四

    public Map<String, List<Category2Vo>> getCatalogJsonFromDbWithRedisLock() {
    // 占分布式锁,去redis占坑 设置过期时间 必须和加锁是同步的,原子的
    String uuid = UUID.randomUUID().toString();
    Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
    // 加锁成功...执行业务
    if (lock) {
    // 执行业务代码
    Map<String, List<Category2Vo>> dataFromDb = getDataFromDb();
    // 判断是否为自己的锁 获取值对比+对比成功删除=原子操作
    String lockValue = redisTemplate.opsForValue().get("lock");
    if (uuid.equals(lockValue)) {
    redisTemplate.delete("lock");// 删除锁
    }
    return dataFromDb;
    } else {
    // 加锁失败...重试
    // 休眠100ms重试
    try {
    Thread.sleep(100);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    return getCatalogJsonFromDbWithRedisLock();// 自旋的方式
    }
    }
    
    • 问题

      如果正好判断是当前值,正要删除锁的时候,锁已经过期, 别人已经设置到了新的值。那么我们删除的是别人的锁

    • 解决

      删除锁必须保证原子性。使用redis+Lua脚本完成

  • 阶段五-最终形态

    保证加锁【占位+过期时间】和删除锁【判断+删除】的原子性。 更难的事情,锁的自动续期

     public Map<String, List<Category2Vo>> getCatalogJsonFromDbWithRedisLock() {
    // 占分布式锁,去redis占坑 设置过期时间 必须和加锁是同步的,原子的
    String uuid = UUID.randomUUID().toString();
    Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
    // 加锁成功...执行业务
    if (lock) {
    System.out.println("获取分布式锁成功~~~");
    Map<String, List<Category2Vo>> dataFromDb;
    try {
    // 执行业务代码
    dataFromDb = getDataFromDb();
    } finally {
    // 判断是否为自己的锁
    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    // 删除锁
    Long execute = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid);
    }
    return dataFromDb;
    } else {
    // 加锁失败...重试
    // 休眠100ms重试
    try {
    Thread.sleep(200);
    } catch (Exception e) { }
    return getCatalogJsonFromDbWithRedisLock();// 自旋的方式
    }
    }
    

分布式锁框架:Redisson

整合redisson作为分布式锁等功能框架

  • 引入依赖

    <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.12.0</version>
    </dependency>
    
  • 配置

     /**
    * 所有对Redisson的使用都是通过RedissonClient对象
    *
    * @return
    */
    @Bean(destroyMethod = "shutdown")
    public RedissonClient redisson() {
    // 创建配置
    Config config = new Config();
    config.useSingleServer().setAddress("redis://127.0.0.1:6379");
    // 根据config创建出RedissonClient实例
    RedissonClient redisson = Redisson.create(config);
    return redisson;
    }
    
  • lock锁测试

    • 情况一:不设置自动解锁时间
    @ResponseBody
    @GetMapping("/hello")
    public String hello() {
    // 获取一把锁。只要锁的名字一样,就是同一把锁
    RLock lock = redissonClient.getLock("my-lock");
    // 加锁
    lock.lock();// 阻塞式等待。默认加的锁都是30s时间。
    try {
    System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId());
    Thread.sleep(20000);
    } catch (Exception e) {
    } finally {
    // 解锁
    System.out.println("释放锁..." + Thread.currentThread().getId());
    lock.unlock();
    }
    return "hello";
    }
    

    问题

    假设解锁代码没有运行,redisson会不会出现死锁

    1. 锁的自动延期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长,锁自动过期就会被删掉
    2. 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。
    • 情况二:设置了自动解锁时间
    @ResponseBody
    @GetMapping("/hello")
    public String hello() {
    // 获取一把锁。只要锁的名字一样,就是同一把锁
    RLock lock = redissonClient.getLock("my-lock");
    // 加锁
    lock.lock(10, TimeUnit.SECONDS);// 10秒自动解锁,自动解锁时间一定要大于业务的执行时间。
    try {
    System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId());
    Thread.sleep(20000);
    } catch (Exception e) {
    } finally {
    // 解锁
    System.out.println("释放锁..." + Thread.currentThread().getId());
    lock.unlock();
    }
    return "hello";
    }
    

    问题

    lock.lock(10, TimeUnit.SECONDS);在锁时间到了以后,不会自动续期

    1. 如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间

    2. 如果我们未指定锁的超时时间,就使用LockWatchdogTimeout看门狗的默认时间 30 * 1000;

      只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10s都会自动再次续期成30s。

      续期时间:internalLockLeaseTime【看门狗时间】 / 3L,10s续期一次

    3. 最佳实战:

      lock.lock(30, TimeUnit.SECONDS); 省掉了整个续期操作。手动解锁

  • 读写锁测试

    // 保证一定能读到最新数据,修改期间,写锁是一个排它锁(互斥锁、独享锁)。读锁是一个共享锁
    // 写锁没释放,读就必须等待
    // 读 + 读:相当于无锁,并发读,只会在redis中记录好,所有当前的读锁,他们都会同时加锁成功
    // 写 + 读:等待写锁释放
    // 写 + 写:阻塞方式
    // 读 + 写:有读锁,写也需要等待
    // 只要有写的存在,都必须等待
    @ResponseBody
    @GetMapping("/write")
    public String writeValue() {
    String s = "";
    RReadWriteLock lock = redissonClient.getReadWriteLock("rw-lock");
    RLock writeLock = lock.writeLock();
    try {
    // 1、改数据加写锁,读数据加读锁
    writeLock.lock();
    s = UUID.randomUUID().toString();
    Thread.sleep(20000);
    stringRedisTemplate.opsForValue().set("writeValue", s);
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    // 解锁
    writeLock.unlock();
    }
    return s;
    }
    @ResponseBody
    @GetMapping("/read")
    public String readValue() {
    String s = "";
    RReadWriteLock lock = redissonClient.getReadWriteLock("rw-lock");
    lock.readLock().lock();
    try {
    s = stringRedisTemplate.opsForValue().get("writeValue");
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    lock.readLock().unlock();
    }
    return s;
    }
    
  • 信号量

    
    /**
    * 车库停车举例
    * 停车 初始化缓存中park = 3
    * 信号量也可以用作分布式限流。
    *
    * @return
    * @throws InterruptedException
    */
    @ResponseBody
    @GetMapping("/park")
    public String park() throws InterruptedException {
    RSemaphore park = redissonClient.getSemaphore("park");
    //
    park.acquire();// 获取一个信号,获取一个值,占一个车位
    阻塞的
    boolean b = park.tryAcquire();// 非阻塞的
    if (b) {
    // 执行业务
    return "ok";
    }
    return "error";
    }
    /**
    * 从车位开走
    * @return
    */
    @GetMapping("/go")
    @ResponseBody
    public String go() {
    RSemaphore park = redissonClient.getSemaphore("park");
    park.release();// 释放一个车位
    return "ok";
    }
    
  • 闭锁

    /**
    * 举例:放假锁门
    * 假设有5个班,全部走完,才可以锁大门
    */
    @GetMapping("lockDoor")
    @ResponseBody
    public String lockDoor() throws InterruptedException {
    RCountDownLatch door = redissonClient.getCountDownLatch("door");
    door.trySetCount(5);
    door.await(); // 等待闭锁都完成
    return "放假了...";
    }
    @GetMapping("/go/{id}")
    @ResponseBody
    public String gogogo(@PathVariable("id") Long id) {
    RCountDownLatch door = redissonClient.getCountDownLatch("door");
    door.countDown();// 计数减1
    return id+"班的人都走了...";
    }
    
  • 使用Redisson解决分布式锁问题

    public Map<String, List<Category2Vo>> getCatalogJsonFromDbWithRedissonLock() {
    // 1、锁的名字,锁的粒度,越细越快
    // 锁的粒度:具体缓存的是某个数据,比如:11号商品: product-11-lock product-12-lock
    RLock lock = redisson.getLock("Catalog-lock");
    lock.lock();
    // 加锁成功...执行业务
    Map<String, List<Category2Vo>> dataFromDb;
    try {
    // 执行业务代码
    dataFromDb = getDataFromDb();// 该业务方法在上面
    } finally {
    lock.unlock();
    }
    return dataFromDb;
    }
    

最后

以上就是美满钢笔为你收集整理的高并发下缓存失效问题-缓存穿透、雪崩、击穿(使用本地锁和分布式锁Redisson解决)高并发下缓存失效问题-缓存穿透、雪崩、击穿(本地锁和分布式锁Redisson)的全部内容,希望文章能够帮你解决高并发下缓存失效问题-缓存穿透、雪崩、击穿(使用本地锁和分布式锁Redisson解决)高并发下缓存失效问题-缓存穿透、雪崩、击穿(本地锁和分布式锁Redisson)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(49)

评论列表共有 0 条评论

立即
投稿
返回
顶部