概述
在读写更新数据库后,我们一般都只是简单更新一下redis缓存:
@Transactional
public Product create(Product product){
Product prductResult = productDao.create(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE+prductResult.getId(), JSON.toJSONString(prductResult));
return prductResult;
}
@Transactional
public Product update(Product product){
Product productResult = productDao.update(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE+productResult.getId(),JSON.toJSONString(productResult));
return productResult;
}
public Product get(Long productId){
Product product =null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
//先从缓存拿,拿到直接返回
String productStr = redisUtil.get(productCacheKey);
if (!StringUtils.isEmpty(productStr)){
product = JSON.parseObject(productStr, Product.class);
return product;
}
//缓存没有,从数据库拿
product = productDao.get(productId);
if (product!=null){
//更新缓存
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE+product.getId(),JSON.toJSONString(product));
}
return product;
}
这种简单的操作在高并发场景下就会出现许多问题,下面就对可能会出现的问题提出一些见解:
1.冷热数据
有的数据每天访问是热门的,访问次数很多,但也有10%左右的数据是冷门数据,对于这种冷门数据没必要让他一直待在redis缓存里面占用资源,而对于热门数据希望他在缓存里面多待一点,因此对于存放缓存的代码可以设置超时时间,并且设置一个缓存延期,实现数据冷热分离
public static final Integer PRODUCT_CACHE_TIMEOUT = 60 * 60 * 24;
于是代码可改成:
@Transactional
public Product create(Product product){
Product prductResult = productDao.create(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE+prductResult.getId(),
JSON.toJSONString(prductResult));
return prductResult;
}
@Transactional
public Product update(Product product){
Product productResult = productDao.update(product);
//更新缓存,设置超时时间redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE+productResult.getId(),JSON.toJSONString(productResult),PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS);
return productResult;
}
public Product get(Long productId){
Product product =null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
//先从缓存拿,拿到直接返回
String productStr = redisUtil.get(productCacheKey);
if (!StringUtils.isEmpty(productStr)){
product = JSON.parseObject(productStr, Product.class);
//每从缓存读取一个数据就进行延期
redisUtil.expire(productCacheKey,PRODUCT_CACHE_TIMEOUT,TimeUnit.SECONDS);
return product;
}
//缓存没有
product = productDao.get(productId);
if (product!=null){
//更新缓存,设置超时时间
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE+product.getId(),
JSON.toJSONString(product),PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS);
}
return product;
}
2.缓存击穿问题:
对于一些批量上架的商品,一般都放在redis缓存里面,然后给他们统一设置超时时间,比如10分钟,但是如果在十分钟这一时刻,超时时间到了,这些缓存就会被清除掉,而如果此时,正好有几十W个请求过来,这些请求都会先查询redis缓存,但是缓存里面此时已经没有了,那么这些请求就会直接击穿redis缓存访问数据库,这样就会给数据库带来很大的压力,有可能扛不住。
对于这种情况,我们可以将超时时间设置成随机的,不让他们同时失效。
private Integer getProductCacheTimeout() {
//设置随机超时时间解决缓存批量失效导致的缓存击穿问题
return PRODUCT_CACHE_TIMEOUT + new Random().nextInt(5) * 60 * 60;
}
于是我们设置超时时间的代码就可改成:
@Transactional
public Product update(Product product){
Product productResult = productDao.update(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE+productResult.getId(),JSON.toJSONString(productResult),getProductCacheTimeout(), TimeUnit.SECONDS);
return productResult;
}
public Product get(Long productId){
Product product =null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
//先从缓存拿,拿到直接返回
String productStr = redisUtil.get(productCacheKey);
if (!StringUtils.isEmpty(productStr)){
product = JSON.parseObject(productStr, Product.class);
//每从缓存读取一个数据就进行延期
redisUtil.expire(productCacheKey,getProductCacheTimeout(),TimeUnit.SECONDS);
return product;
}
//缓存没有
product = productDao.get(productId);
if (product!=null){
//更新缓存
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE+product.getId(),JSON.toJSONString(product),getProductCacheTimeout(), TimeUnit.SECONDS);
}
return product;
}
3.缓存穿透问题:
在JD或者淘宝上面,每一个商品都有自己对应的一个商品ID,而我们知道了这个ID,我们就可以通过某些软件,每秒发送几十万不存在的商品ID去发起请求,而对于这些请求,缓存里面是没有的,那么就可以直接穿透缓存到数据库,让数据库崩溃。
对于这种情况,我们可以在数据库查到空值时,在缓存里面设置一个空缓存,这样其他请求访问redis缓存的时候就会获得一个空的值,但是这样又会有一个问题,就是可能会导致redis里面空缓存过多,因此我们可以为空缓存设置超时时间,并且为空缓存设置一个延期,防止一直攻击。
public static final String EMPTY_CACHE="{}";
private Integer genEmptyCacheTimeout() {
//设置空缓存随机超时时间
return 60 + new Random().nextInt(30);
}
于是查询代码可改成
public Product get(Long productId){
Product product =null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
//先从缓存拿,拿到直接返回
String productStr = redisUtil.get(productCacheKey);
if (!StringUtils.isEmpty(productStr)){
if (EMPTY_CACHE.equals(productStr)){
redisUtil.expire(productCacheKey,getEmptyCacheTimeout(),TimeUnit.SECONDS);
return new Product();
}
product = JSON.parseObject(productStr, Product.class);
//每从缓存读取一个数据就进行延期
redisUtil.expire(productCacheKey,getProductCacheTimeout(),TimeUnit.SECONDS);
return product;
}
//缓存没有
product = productDao.get(productId);
if (product!=null){
//更新缓存
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE+product.getId(),JSON.toJSONString(product),getProductCacheTimeout(), TimeUnit.SECONDS);
}else {
redisUtil.set(productCacheKey,EMPTY_CACHE,getEmptyCacheTimeout(),TimeUnit.SECONDS);
}
return product;
}
除此之外,我们还可以使用BloomFilter布隆过滤器来解决这种问题
原理:底层是维护了一个byte二进制数组,通过它,每次往数据库里面加数据时,可以放入布隆过滤器里面(bloomFilter.add("key");),然后布隆过滤器会根据传入的值,进行hash运算,对二进制数组的长度取模,得到的值就是对应的该数组下标,然后数组该位置的值置为1,当下次在查询数据时,就可以通过布隆过滤器来查询数据库有没有这个值((bloomFilter.contains("key"))),如果布隆过滤器返回false则说明数据库里面没有,就可以直接返回一个空值了,而没有必要再去查询缓存和数据库了。
使用布隆过滤器我们需要引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.5</version>
</dependency>
通过redisson创建并且对其初始化(一个字节占8个byte,对于布隆过滤器而言,存放几亿的值byte数组只占用几十M的内存)
RBloomFilter<String> bloomFilter = redisson.getBloomFilter("keyList");
//初始化布隆过滤器:预计元素个数为100000000L,误差率为3%,根据这两个参数会计算出底层的bit数组大小
bloomFilter.tryInit(100000000L,0.03);
4.缓存雪崩问题
平时可能还会遇到这种情况,就是很多请求访问redis缓存,某一时刻,redis扛不住了突然挂掉了,而这时候每秒又有几十万甚至几百万的请求过来,这些请求直接到达数据库,可能会导致数据库也奔溃了,而数据库奔溃了,导致服务器奔溃,服务器崩溃又导致前端奔溃进而导致整个网页也崩溃了。
对于这种问题:
a.首先从redis上面来说我们可以对其进行限流,比如原本redis每次允许的最大请求是10万,我们可以将其限制到允许的请求为8w。超过这个请求可以让其陷入等待,这时我们可以设置一个队列,将其他请求放入队列里面进行等待,然后可以前端给出一个友好提示:当前业务过忙,请稍后。
b.对redis可以建立主从架构,哨兵架构以及集群架构,并且不要在单机上面建立,而是建立在多台机器上面,这样就可以避免单机挂了redis也挂了的问题。
c.建立多级缓存架构,在Nginx上设置一层缓存,在JVM层面也设置缓存交给JVM处理,对于JVM而言,是可以每秒处理几十万几百万数据的,比如可以建立一个Map来存放数据,每次访问时先看该map里面有没有。
5.突发性热点缓存重建问题:
平时可能会有这种情况,就是原本很冷门很少有人访问的数据突然几十万几百万人访问。比如说:微博上面某个拥有几亿粉丝的明星或者某拥有几千万粉丝的主播,某一天,他们突然说了一句某某商品很好,而这种商品可能之前是冷门数据很水有人去关心,而此时,他们的粉丝可能就会有几百万人去访问这个商品,但是对于这种冷门商品来说,redis缓存里面是没有的,这样又会导致这几百万的访问直接到达数据库,使得数据库压力突增。
对于这种问题我们可以建立双重检测机制:比如加一个JVM层面的synchronized锁,保证只有一个请求访问数据库,其他请求陷入阻塞等待状态,等该请求访问完数据库后,将数据写入缓存里面,这样其他请求就可以从缓存里面拿到数据了。
这样代码又可以改为:
public Product get(Long productId){
Product product =null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
//先从缓存拿,拿到直接返回
String productStr = redisUtil.get(productCacheKey);
if (!StringUtils.isEmpty(productStr)){
if (EMPTY_CACHE.equals(productStr)){
redisUtil.expire(productCacheKey,getEmptyCacheTimeout(),TimeUnit.SECONDS);
return new Product();
}
product = JSON.parseObject(productStr, Product.class);
//每从缓存读取一个数据就进行延期
redisUtil.expire(productCacheKey,getProductCacheTimeout(),TimeUnit.SECONDS);
return product;
}
//加锁进行双重检测,解决突发性热点缓存问题
synchronized (this){
productStr = redisUtil.get(productCacheKey);
if (!StringUtils.isEmpty(productStr)){
if (EMPTY_CACHE.equals(productStr)){
redisUtil.expire(productCacheKey,getEmptyCacheTimeout(),TimeUnit.SECONDS);
return null;
}
product = JSON.parseObject(productStr, Product.class);
//每从缓存读取一个数据就进行延期
redisUtil.expire(productCacheKey,getProductCacheTimeout(),TimeUnit.SECONDS);
return product;
}
//缓存没有
product = productDao.get(productId);
if (product!=null){
//更新缓存
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE+product.getId(),JSON.toJSONString(product),getProductCacheTimeout(), TimeUnit.SECONDS);
}else {
redisUtil.set(productCacheKey,EMPTY_CACHE,getEmptyCacheTimeout(),TimeUnit.SECONDS);
}
}
return product;
}
但是这种JVM层面的锁存在一个问题:就是它加锁时,其他请求是进不来的,比如:A商品来的时候加了锁,此时又来了个B商品,但是B商品却因为A商品而被阻塞住了,这种情况原本是不应该出现的。
此时我们可以使用redisson里面的分布式锁。
public static final String LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX="lock:product:hot_cache_create:";
RLock hotProductCacheLock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX + productId);
hotProductCacheLock.lock();
于是代码改成:
public Product get(Long productId){
Product product =null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
//先从缓存拿,拿到直接返回
String productStr = redisUtil.get(productCacheKey);
if (!StringUtils.isEmpty(productStr)){
if (EMPTY_CACHE.equals(productStr)){
redisUtil.expire(productCacheKey,getEmptyCacheTimeout(),TimeUnit.SECONDS);
return null;
}
product = JSON.parseObject(productStr, Product.class);
//每从缓存读取一个数据就进行延期
redisUtil.expire(productCacheKey,getProductCacheTimeout(),TimeUnit.SECONDS);
return product;
}
//加分布式锁,解决突发性热点缓存重建问题
RLock hotProductCacheLock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX + productId);
hotProductCacheLock.lock();
try {
productStr = redisUtil.get(productCacheKey);
if (!StringUtils.isEmpty(productStr)){
if (EMPTY_CACHE.equals(productStr)){
redisUtil.expire(productCacheKey,getEmptyCacheTimeout(),TimeUnit.SECONDS);
return null;
}
product = JSON.parseObject(productStr, Product.class);
//每从缓存读取一个数据就进行延期
redisUtil.expire(productCacheKey,getProductCacheTimeout(),TimeUnit.SECONDS);
return product;
}
//缓存没有
product = productDao.get(productId);
if (product!=null){
//更新缓存
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE+product.getId(),JSON.toJSONString(product),getProductCacheTimeout(), TimeUnit.SECONDS);
}else {
redisUtil.set(productCacheKey,EMPTY_CACHE,getEmptyCacheTimeout(),TimeUnit.SECONDS);
}
}finally {
hotProductCacheLock.unlock();
}
return product;
}
对代码进行重构一下,将从缓存里面拿数据的重复代码抽取成一个方法
private Product getProductFromCache(String productCacheKey) {
Product product=null;
//先从缓存拿,拿到直接返回
String productStr = redisUtil.get(productCacheKey);
if (!StringUtils.isEmpty(productStr)){
if (EMPTY_CACHE.equals(productStr)){
redisUtil.expire(productCacheKey,getEmptyCacheTimeout(),TimeUnit.SECONDS);
return new Product();
}
product = JSON.parseObject(productStr, Product.class);
//每从缓存读取一个数据就进行延期
redisUtil.expire(productCacheKey,getProductCacheTimeout(),TimeUnit.SECONDS);
}
return product;
}
重构后的代码:
public Product get(Long productId){
Product product =null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
product = getProductFromCache(productCacheKey);
if (product!=null){
return product;
}
//加分布式锁,解决突发性热点缓存重建问题
RLock hotProductCacheLock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX + productId);
hotProductCacheLock.lock();
try {
product = getProductFromCache(productCacheKey);
if (product!=null){
return product;
}
//缓存没有
product = productDao.get(productId);
if (product!=null){
//更新缓存
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE+product.getId(),JSON.toJSONString(product),getProductCacheTimeout(), TimeUnit.SECONDS);
}else {
redisUtil.set(productCacheKey,EMPTY_CACHE,getEmptyCacheTimeout(),TimeUnit.SECONDS);
}
}finally {
hotProductCacheLock.unlock();
}
return product;
}
6.缓存与数据库双写不一致问题:
a.线程1对数据库中某个字段比如balance,将其修改为balance=100,此时其应该将修改后的值写入redis缓存中,而正当其准备写入缓存时,由于某种原因卡了一小会,此时线程2也将数据库中balance进行修改balance=50,然后将balance=50写入缓存里面,这时线程1恢复了过来,将balance=100写入缓存中,这时就覆盖了线程2写入缓存中的数据,此时缓存中的balance是100,但是数据库中的值是线程2修改后的值balance=50,此时就出现了缓存与数据库双写不一致问题。
b.有的人说解决这种情况,只需要每次更新数据库的时候删除掉缓存就行了,但只是删除掉缓存,还是会出现这种情况:比如线程1将数据库中的balance修改为balance=100,然后删除掉缓存,此时缓存为空,这时线程2进行查询,先查缓存为空,然后查询数据库,得到balance=100的数据,这时他应该将数据写入缓存中,可是当其正要写入缓存时,又由于某种原因卡住了,此时线程3过来将数据库中的balance修改为balance=50,然后删除缓存,此时线程2恢复了过来,将其刚刚查询到的balance=100写入缓存里面,但是此时数据库中的值是线程2刚刚修改过的balance=50,这时又出现了缓存与数据库双写不一致问题。
这种问题归根结底都是因为在高并发的情况下读写数据库而出现的问题,找到了根源,我们就可以在要对数据库进行读写数据库的时候加一个分布式锁,在写完缓存的时候释放锁,这样其他线程想要对数据库进行操作时就会被阻塞住。
public static final String LOCK_PRODUCT_UPDATE_PREFIX="lock:product:update:";
RLock productUpdateLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
productUpdateLock.lock();
此时代码改为:
@Transactional
public Product create(Product product){
Product productResult=null;
RLock productUpdateLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + product.getId());
productUpdateLock.lock();
try {
productResult = productDao.create(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE+productResult.getId(), JSON.toJSONString(productResult));
}finally {
productUpdateLock.unlock();
}
return productResult;
}
@Transactional
public Product update(Product product){
Product productResult=null;
RLock productUpdateLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + product.getId());
productUpdateLock.lock();
try {
productResult = productDao.update(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE+productResult.getId(),JSON.toJSONString(productResult),getProductCacheTimeout(), TimeUnit.SECONDS);
}finally {
productUpdateLock.unlock();
}
return productResult;
}
public Product get(Long productId){
Product product =null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
product = getProductFromCache(productCacheKey);
if (product!=null){
return product;
}
//加分布式锁,解决突发性热点缓存重建问题
RLock hotProductCacheLock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX + productId);
hotProductCacheLock.lock();
try {
product = getProductFromCache(productCacheKey);
if (product!=null){
return product;
}
//缓存没有
RLock productUpdateLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
productUpdateLock.lock();
try {
product = productDao.get(productId);
if (product!=null){
//更新缓存
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE+product.getId(),JSON.toJSONString(product),getProductCacheTimeout(), TimeUnit.SECONDS);
}else {
redisUtil.set(productCacheKey,EMPTY_CACHE,getEmptyCacheTimeout(),TimeUnit.SECONDS);
}
}finally {
productUpdateLock.unlock();
}
}finally {
hotProductCacheLock.unlock();
}
return product;
}
大多数情况下,一般都是读多写少的情况,对于这种情况,我们完全可以使用轻一点的锁,读写锁,读取数据的时候加读锁,写数据的时候加写锁,对于读操作之间是不互斥的,只有读写写读的情况下才会互斥。而redisson中就提供了读写锁机制redisson.getReadWriteLock();
RReadWriteLock productUpdateLock = redisson.getReadWriteLock(LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX + productId);
RLock rLock = productUpdateLock.readLock();
RReadWriteLock productUpdateLock = redisson.getReadWriteLock(LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX + product.getId());
RLock writeLock = productUpdateLock.writeLock();
此时代码可以修改如下:
@Transactional
public Product update(Product product){
Product productResult=null;
//
RLock productUpdateLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + product.getId());
//
productUpdateLock.lock();
RReadWriteLock productUpdateLock = redisson.getReadWriteLock(LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX + product.getId());
RLock writeLock = productUpdateLock.writeLock();
writeLock.lock();
try {
productResult = productDao.update(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE+productResult.getId(),JSON.toJSONString(productResult),getProductCacheTimeout(), TimeUnit.SECONDS);
}finally {
writeLock.unlock();
}
return productResult;
}
public Product get(Long productId){
Product product =null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
product = getProductFromCache(productCacheKey);
if (product!=null){
return product;
}
//加分布式锁,解决突发性热点缓存重建问题
RLock hotProductCacheLock = redisson.getLock(LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX + productId);
hotProductCacheLock.lock();
try {
product = getProductFromCache(productCacheKey);
if (product!=null){
return product;
}
//缓存没有
//
RLock productUpdateLock = redisson.getLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
RReadWriteLock productUpdateLock = redisson.getReadWriteLock(LOCK_PRODUCT_HOT_CACHE_CREATE_PREFIX + productId);
RLock rLock = productUpdateLock.readLock();
rLock.lock();
try {
product = productDao.get(productId);
if (product!=null){
//更新缓存
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE+product.getId(),JSON.toJSONString(product),getProductCacheTimeout(), TimeUnit.SECONDS);
}else {
redisUtil.set(productCacheKey,EMPTY_CACHE,getEmptyCacheTimeout(),TimeUnit.SECONDS);
}
}finally {
rLock.unlock();
}
}finally {
hotProductCacheLock.unlock();
}
return product;
对于上面的分布式锁还可以优化:我们如果可以估测某个线程的执行时间比如1s内就能执行完操作,此时我们就可以用hotProductCacheLock.tryLock(1,TimeUnit.SECONDS);这把锁来优化,就是比如有几十万的线程正在等待,当等待1s之后,tryLock锁就会失效,这些线程就会并发执行,就没有必要再像分布式锁那样执行很多次加锁解锁的逻辑,这种情况就是分布式锁串行传并发,大大提高性能,但是前提是我们得能预估到第一个加锁的线程的执行时间。
最后
以上就是现代摩托为你收集整理的Redis高并发缓存架构之缓存穿透,击穿,雪崩、热点数据重建及缓存与数据库双写不一致问题及解决方案的全部内容,希望文章能够帮你解决Redis高并发缓存架构之缓存穿透,击穿,雪崩、热点数据重建及缓存与数据库双写不一致问题及解决方案所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复