概述
高并发下缓存失效问题-缓存穿透、雪崩、击穿(本地锁和分布式锁Redisson)
缓存穿透
-
产生原因
指查询一个一定不存在的数据,由于缓存是不命中,将去查询数据库,但是 数据库也无此记录,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义
-
风险
利用不存在的数据进行攻击,数据库瞬时压力增大,最终导致崩溃
-
解决
null结果缓存,并加入短暂过期时间
public Map<String, List<Category2Vo>> getCatalogJson() { // 加入缓存逻辑 String catalogJson = redisTemplate.opsForValue().get("catalogJson"); if (StringUtils.isEmpty(catalogJson)) { // 查询数据库 Map<String, List<Category2Vo>> catalogJsonFromDb = getCatalogJsonFromDb(); // 将数据转换为Json加入到缓存中 String jsonString = JSON.toJSONString(catalogJsonFromDb); // 解决缓存穿透问题 if (catalogJsonFromDb == null) { redisTemplate.opsForValue().set("catalogJson", "0"); } else { redisTemplate.opsForValue().set("catalogJson", jsonString); } return catalogJsonFromDb; } Map<String, List<Category2Vo>> listMap = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Category2Vo>>>() {}); return listMap; }
缓存雪崩
-
产生原因
缓存雪崩是指在我们设置缓存时key采用了相同的过期时间, 导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。
-
解决
原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体 失效的事件。
public Map<String, List<Category2Vo>> getCatalogJson() { // 加入缓存逻辑 String catalogJson = redisTemplate.opsForValue().get("catalogJson"); if (StringUtils.isEmpty(catalogJson)) { // 查询数据库 Map<String, List<Category2Vo>> catalogJsonFromDb = getCatalogJsonFromDb(); // 将数据转换为Json加入到缓存中 String jsonString = JSON.toJSONString(catalogJsonFromDb); // 解决缓存穿透问题 if (catalogJsonFromDb == null) { redisTemplate.opsForValue().set("catalogJson", "0"); } else { // 解决缓存雪崩问题 redisTemplate.opsForValue().set("catalogJson", jsonString, new Random().nextInt(5) + 1, TimeUnit.DAYS); } return catalogJsonFromDb; } Map<String, List<Category2Vo>> listMap = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Category2Vo>>>() {}); return listMap; }
缓存击穿
-
产生原因
- 对于一些设置了过期时间的key,如果这些key可能会在某些 时间点被超高并发地访问,是一种非常“热点”的数据。
- 如果这个key在大量请求同时进来前正好失效,那么所有对 这个key的数据查询都落到db,我们称为缓存击穿。
-
解决:加锁
大量并发只让一个去查,其他人等待,查到以后释放锁,其他 人获取到锁,先查缓存,就会有数据,不用去db
本地锁
synchronized、JUC(Lock)
@Override
public Map<String, List<Category2Vo>> getCatalogJson() {
// 加入缓存逻辑
String catalogJson = redisTemplate.opsForValue().get("catalogJson");
if (StringUtils.isEmpty(catalogJson)) {
// 查询数据库
Map<String, List<Category2Vo>> catalogJsonFromDb = getCatalogJsonFromDb();
return catalogJsonFromDb;
}
Map<String, List<Category2Vo>> listMap = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Category2Vo>>>() {});
return listMap;
}
/**
* 从数据库中查询并封装分类数据
*
* @return
*/
public Map<String, List<Category2Vo>> getCatalogJsonFromDb() {
// 加锁:解决缓存击穿
// 只要是同一把锁,就能锁住需要这个锁的所有线程
// synchronized (this)
SpringBoot所有的组件在容器中都是单例的
synchronized (this) {
// 得到锁之后,我们需要再去缓存中确定一次,如果没有才需要继续查询
String catalogJson = redisTemplate.opsForValue().get("catalogJson");
if (!StringUtils.isEmpty(catalogJson)) {
// 缓存不为空直接返回
Map<String, List<Category2Vo>> listMap = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Category2Vo>>>() {});
return listMap;
}
List<CategoryEntity> selectList = baseMapper.selectList(null);
System.out.println("查询了数据库...");
// 获取全部一级分类
List<CategoryEntity> category1List = getCategoryByParentCid(selectList, 0L);
// 封装数据
Map<String, List<Category2Vo>> listMap = category1List.stream().collect(Collectors.toMap(key -> key.getCatId().toString(), v -> {
// 查询二级分类
List<CategoryEntity> category2List = getCategoryByParentCid(selectList, v.getCatId());
List<Category2Vo> category2VoList = category2List.stream().map(item -> {
Category2Vo category2Vo = new Category2Vo(item.getParentCid().toString(), item.getCatId().toString(), item.getName(), null);
// 查询三级分类
List<CategoryEntity> category3List = getCategoryByParentCid(selectList, item.getCatId());
List<Category2Vo.Category3Vo> category3VoList = category3List.stream().map(item2 -> {
return new Category2Vo.Category3Vo(item2.getParentCid().toString(), item2.getCatId().toString(), item2.getName());
}).collect(Collectors.toList());
category2Vo.setCatalog3List(category3VoList);
return category2Vo;
}).collect(Collectors.toList());
return category2VoList;
}));
// 需要在锁中进行缓存,否则会再释放锁的同时,有其他请求进来,导致查询了多次数据库,出现没有锁住的问题
// 将数据转换为Json加入到缓存中
String jsonString = JSON.toJSONString(listMap);
// 解决缓存穿透问题
if (listMap == null) {
redisTemplate.opsForValue().set("catalogJson", "0");
} else {
// 结局缓存雪崩问题
redisTemplate.opsForValue().set("catalogJson", jsonString, new Random().nextInt(5) + 1, TimeUnit.DAYS);
}
return listMap;
}
}
/**
* 根据父类ID查询出全部子分类
*
* @param selectList
* @param parentCid
* @return
*/
public List<CategoryEntity> getCategoryByParentCid(List<CategoryEntity> selectList, Long parentCid) {
List<CategoryEntity> entityList = selectList.stream().
filter(item -> item.getParentCid() == parentCid).
collect(Collectors.toList());
return entityList;
}
分布式锁
分布式锁演进
-
业务代码
private Map<String, List<Category2Vo>> getDataFromDb() { // 得到锁之后,我们需要再去缓存中确定一次,如果没有才需要继续查询 String catalogJson = redisTemplate.opsForValue().get("catalogJson"); if (!StringUtils.isEmpty(catalogJson)) { // 缓存不为空直接返回 Map<String, List<Category2Vo>> listMap = JSON.parseObject(catalogJson, new TypeReference<Map<String, List<Category2Vo>>>() { }); return listMap; } List<CategoryEntity> selectList = baseMapper.selectList(null); System.out.println("查询了数据库..."); // 获取全部一级分类 List<CategoryEntity> category1List = getCategoryByParentCid(selectList, 0L); // 封装数据 Map<String, List<Category2Vo>> listMap = category1List.stream().collect(Collectors.toMap(key -> key.getCatId().toString(), v -> { // 查询二级分类 List<CategoryEntity> category2List = getCategoryByParentCid(selectList, v.getCatId()); List<Category2Vo> category2VoList = category2List.stream().map(item -> { Category2Vo category2Vo = new Category2Vo(item.getParentCid().toString(), item.getCatId().toString(), item.getName(), null); // 查询三级分类 List<CategoryEntity> category3List = getCategoryByParentCid(selectList, item.getCatId()); List<Category2Vo.Category3Vo> category3VoList = category3List.stream().map(item2 -> { return new Category2Vo.Category3Vo(item2.getParentCid().toString(), item2.getCatId().toString(), item2.getName()); }).collect(Collectors.toList()); category2Vo.setCatalog3List(category3VoList); return category2Vo; }).collect(Collectors.toList()); return category2VoList; })); // 需要在锁中进行缓存,否则会再释放锁的同时,有其他请求进来,导致查询了多次数据库,出现没有锁住的问题 // 将数据转换为Json加入到缓存中 String jsonString = JSON.toJSONString(listMap); // 解决缓存穿透问题 if (listMap == null) { redisTemplate.opsForValue().set("catalogJson", "0"); } else { // 结局缓存雪崩问题 redisTemplate.opsForValue().set("catalogJson", jsonString, new Random().nextInt(5) + 1, TimeUnit.DAYS); } return listMap; }
-
阶段一
public Map<String, List<Category2Vo>> getCatalogJsonFromDbWithRedisLock() { // 占分布式锁,去redis占坑 命令:set lock 111 NX Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111"); // 加锁成功...执行业务 if (lock) { // 执行业务代码 Map<String, List<Category2Vo>> dataFromDb = getDataFromDb(); redisTemplate.delete("lock");// 删除锁 return dataFromDb; } else { // 加锁失败...重试 // 休眠100ms重试 try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return getCatalogJsonFromDbWithRedisLock();// 自旋的方式 } }
-
问题
setnx占好了位,业务代码异常或者程序在页面过程中宕机。没有执行删除锁逻辑,这就造成了死锁
-
解决
设置锁的自动过期,即使没有删除,会自动删除
-
-
阶段二
public Map<String, List<Category2Vo>> getCatalogJsonFromDbWithRedisLock() { // 占分布式锁,去redis占坑 Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111"); // 加锁成功...执行业务 if (lock) { // 设置过期时间 redisTemplate.expire("lock", 30, TimeUnit.SECONDS); // 执行业务代码 Map<String, List<Category2Vo>> dataFromDb = getDataFromDb(); redisTemplate.delete("lock");// 删除锁 return dataFromDb; } else { // 加锁失败...重试 // 休眠100ms重试 try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return getCatalogJsonFromDbWithRedisLock();// 自旋的方式 } }
-
问题
setnx设置好,正要去设置过期时间,宕机。又死锁了。
-
解决
设置过期时间和占位必须是原子的。redis支持使用setnx ex 命令
-
-
阶段三
public Map<String, List<Category2Vo>> getCatalogJsonFromDbWithRedisLock() { // 占分布式锁,去redis占坑 设置过期时间 必须和加锁是同步的,原子的 Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111", 300, TimeUnit.SECONDS); // 加锁成功...执行业务 if (lock) { // 执行业务代码 Map<String, List<Category2Vo>> dataFromDb = getDataFromDb(); redisTemplate.delete("lock");// 删除锁 return dataFromDb; } else { // 加锁失败...重试 // 休眠100ms重试 try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return getCatalogJsonFromDbWithRedisLock();// 自旋的方式 } }
-
问题
删除锁直接删除??? 如果由于业务时间很长,锁自己过期了,我们直接删除,有可能把别人正在持有的锁删除了。
-
解决
占锁的时候,值指定为uuid,每个人匹配是自己 的锁才删除。
-
-
阶段四
public Map<String, List<Category2Vo>> getCatalogJsonFromDbWithRedisLock() { // 占分布式锁,去redis占坑 设置过期时间 必须和加锁是同步的,原子的 String uuid = UUID.randomUUID().toString(); Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS); // 加锁成功...执行业务 if (lock) { // 执行业务代码 Map<String, List<Category2Vo>> dataFromDb = getDataFromDb(); // 判断是否为自己的锁 获取值对比+对比成功删除=原子操作 String lockValue = redisTemplate.opsForValue().get("lock"); if (uuid.equals(lockValue)) { redisTemplate.delete("lock");// 删除锁 } return dataFromDb; } else { // 加锁失败...重试 // 休眠100ms重试 try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } return getCatalogJsonFromDbWithRedisLock();// 自旋的方式 } }
-
问题
如果正好判断是当前值,正要删除锁的时候,锁已经过期, 别人已经设置到了新的值。那么我们删除的是别人的锁
-
解决
删除锁必须保证原子性。使用redis+Lua脚本完成
-
-
阶段五-最终形态
保证加锁【占位+过期时间】和删除锁【判断+删除】的原子性。 更难的事情,锁的自动续期
public Map<String, List<Category2Vo>> getCatalogJsonFromDbWithRedisLock() { // 占分布式锁,去redis占坑 设置过期时间 必须和加锁是同步的,原子的 String uuid = UUID.randomUUID().toString(); Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS); // 加锁成功...执行业务 if (lock) { System.out.println("获取分布式锁成功~~~"); Map<String, List<Category2Vo>> dataFromDb; try { // 执行业务代码 dataFromDb = getDataFromDb(); } finally { // 判断是否为自己的锁 String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; // 删除锁 Long execute = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid); } return dataFromDb; } else { // 加锁失败...重试 // 休眠100ms重试 try { Thread.sleep(200); } catch (Exception e) { } return getCatalogJsonFromDbWithRedisLock();// 自旋的方式 } }
分布式锁框架:Redisson
整合redisson作为分布式锁等功能框架
-
引入依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.12.0</version> </dependency>
-
配置
/** * 所有对Redisson的使用都是通过RedissonClient对象 * * @return */ @Bean(destroyMethod = "shutdown") public RedissonClient redisson() { // 创建配置 Config config = new Config(); config.useSingleServer().setAddress("redis://127.0.0.1:6379"); // 根据config创建出RedissonClient实例 RedissonClient redisson = Redisson.create(config); return redisson; }
-
lock锁测试
- 情况一:不设置自动解锁时间
@ResponseBody @GetMapping("/hello") public String hello() { // 获取一把锁。只要锁的名字一样,就是同一把锁 RLock lock = redissonClient.getLock("my-lock"); // 加锁 lock.lock();// 阻塞式等待。默认加的锁都是30s时间。 try { System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId()); Thread.sleep(20000); } catch (Exception e) { } finally { // 解锁 System.out.println("释放锁..." + Thread.currentThread().getId()); lock.unlock(); } return "hello"; }
问题
假设解锁代码没有运行,redisson会不会出现死锁?
- 锁的自动延期,如果业务超长,运行期间自动给锁续上新的30s。不用担心业务时间长,锁自动过期就会被删掉
- 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除。
- 情况二:设置了自动解锁时间
@ResponseBody @GetMapping("/hello") public String hello() { // 获取一把锁。只要锁的名字一样,就是同一把锁 RLock lock = redissonClient.getLock("my-lock"); // 加锁 lock.lock(10, TimeUnit.SECONDS);// 10秒自动解锁,自动解锁时间一定要大于业务的执行时间。 try { System.out.println("加锁成功,执行业务..." + Thread.currentThread().getId()); Thread.sleep(20000); } catch (Exception e) { } finally { // 解锁 System.out.println("释放锁..." + Thread.currentThread().getId()); lock.unlock(); } return "hello"; }
问题
lock.lock(10, TimeUnit.SECONDS);在锁时间到了以后,不会自动续期
-
如果我们传递了锁的超时时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
-
如果我们未指定锁的超时时间,就使用LockWatchdogTimeout看门狗的默认时间 30 * 1000;
只要占锁成功,就会启动一个定时任务【重新给锁设置过期时间,新的过期时间就是看门狗的默认时间】,每隔10s都会自动再次续期成30s。
续期时间:internalLockLeaseTime【看门狗时间】 / 3L,10s续期一次
-
最佳实战:
lock.lock(30, TimeUnit.SECONDS); 省掉了整个续期操作。手动解锁
-
读写锁测试
// 保证一定能读到最新数据,修改期间,写锁是一个排它锁(互斥锁、独享锁)。读锁是一个共享锁 // 写锁没释放,读就必须等待 // 读 + 读:相当于无锁,并发读,只会在redis中记录好,所有当前的读锁,他们都会同时加锁成功 // 写 + 读:等待写锁释放 // 写 + 写:阻塞方式 // 读 + 写:有读锁,写也需要等待 // 只要有写的存在,都必须等待 @ResponseBody @GetMapping("/write") public String writeValue() { String s = ""; RReadWriteLock lock = redissonClient.getReadWriteLock("rw-lock"); RLock writeLock = lock.writeLock(); try { // 1、改数据加写锁,读数据加读锁 writeLock.lock(); s = UUID.randomUUID().toString(); Thread.sleep(20000); stringRedisTemplate.opsForValue().set("writeValue", s); } catch (Exception e) { e.printStackTrace(); } finally { // 解锁 writeLock.unlock(); } return s; } @ResponseBody @GetMapping("/read") public String readValue() { String s = ""; RReadWriteLock lock = redissonClient.getReadWriteLock("rw-lock"); lock.readLock().lock(); try { s = stringRedisTemplate.opsForValue().get("writeValue"); } catch (Exception e) { e.printStackTrace(); } finally { lock.readLock().unlock(); } return s; }
-
信号量
/** * 车库停车举例 * 停车 初始化缓存中park = 3 * 信号量也可以用作分布式限流。 * * @return * @throws InterruptedException */ @ResponseBody @GetMapping("/park") public String park() throws InterruptedException { RSemaphore park = redissonClient.getSemaphore("park"); // park.acquire();// 获取一个信号,获取一个值,占一个车位 阻塞的 boolean b = park.tryAcquire();// 非阻塞的 if (b) { // 执行业务 return "ok"; } return "error"; } /** * 从车位开走 * @return */ @GetMapping("/go") @ResponseBody public String go() { RSemaphore park = redissonClient.getSemaphore("park"); park.release();// 释放一个车位 return "ok"; }
-
闭锁
/** * 举例:放假锁门 * 假设有5个班,全部走完,才可以锁大门 */ @GetMapping("lockDoor") @ResponseBody public String lockDoor() throws InterruptedException { RCountDownLatch door = redissonClient.getCountDownLatch("door"); door.trySetCount(5); door.await(); // 等待闭锁都完成 return "放假了..."; } @GetMapping("/go/{id}") @ResponseBody public String gogogo(@PathVariable("id") Long id) { RCountDownLatch door = redissonClient.getCountDownLatch("door"); door.countDown();// 计数减1 return id+"班的人都走了..."; }
-
使用Redisson解决分布式锁问题
public Map<String, List<Category2Vo>> getCatalogJsonFromDbWithRedissonLock() { // 1、锁的名字,锁的粒度,越细越快 // 锁的粒度:具体缓存的是某个数据,比如:11号商品: product-11-lock product-12-lock RLock lock = redisson.getLock("Catalog-lock"); lock.lock(); // 加锁成功...执行业务 Map<String, List<Category2Vo>> dataFromDb; try { // 执行业务代码 dataFromDb = getDataFromDb();// 该业务方法在上面 } finally { lock.unlock(); } return dataFromDb; }
最后
以上就是美满钢笔为你收集整理的高并发下缓存失效问题-缓存穿透、雪崩、击穿(使用本地锁和分布式锁Redisson解决)高并发下缓存失效问题-缓存穿透、雪崩、击穿(本地锁和分布式锁Redisson)的全部内容,希望文章能够帮你解决高并发下缓存失效问题-缓存穿透、雪崩、击穿(使用本地锁和分布式锁Redisson解决)高并发下缓存失效问题-缓存穿透、雪崩、击穿(本地锁和分布式锁Redisson)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复