概述
为什么需要分布式锁
现在类似Dubbo、Springcloud等分布式微服务的技术越来越火热,主要原因是单体项目难以支撑高流量、高访问量的项目。为了降低服务的负载压力,大多数情况下都会把服务拆分集群部署在不同的服务器上。在这种场景下使用常规的JVM锁(Lock/Sync)是有问题的,其他服务是无法感知当前JVM锁的状态的,JVM锁也不能锁其他服务。所以需要一个分布式锁,所有的服务都是用统一的锁。现在比较常用的分布式锁就是redis、zookeeper和mysql。下面我将使用redis实现分布式锁:
环境准备
为了模拟集群环境,我们打开两个应用使用nginx做负载均衡,启动redis设置stock=5000模拟库存。创建下单接口/stock/deduct,客户端每访问一次接口库存就-1。
案例图
项目依赖
<dependency>
<groupId>lyx.gencode</groupId>
<artifactId>gencode-mybatis</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- mysql -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- mp -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
controller
@RestController
@RequestMapping("/stock")
public class StockController {
@Autowired
private StockService stockService;
@Value("${server.port}")
private String port;
@RequestMapping("/deduct")
public String deduct(){
stockService.deduct();
return "hello,stock deduct-"+port;
}
}
redis分布式锁基本实现
扣减库存方法
/**
* redis分布式锁
*/
@Override
public void deduct(){
// 先拿到分布式锁
while(!redisTemplate.opsForValue().setIfAbsent("lock", "lock")){
// 线程随眠50毫秒
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 如果上锁成功就扣减库存
try{
// 获取库存并-1
String stock = redisTemplate.opsForValue().get("stock");
if(!StringUtils.isEmpty(stock)){
Integer count = Integer.valueOf(stock);
if(count > 0){
redisTemplate.opsForValue().set("stock",String.valueOf(count-1));
}
}
}finally {
redisTemplate.delete("lock");
}
}
上面的代码实现了加锁和释放锁的操作,但还是存在问题,可能会存在死锁问题。假如上锁成功后代码执行到一半系统宕机,没有执行delete操作。这把锁就会一直存在,后面的请求无法获取到锁一直while空转,最终可能会导致所有服务器宕机。所以,我们需要给这把锁加上过期时间。
加上过期时间防止释放别人的锁
为了防止死锁等问题,需要给锁加上过期时间。加入10秒内无解锁操作将自动解锁。
// 给锁加上10s的过期时间
Boolean lockres = redisTemplate.opsForValue().setIfAbsent("lock", "lock",10, TimeUnit.SECONDS);
那么问题又来了假如系统没有宕机,只是操作超时了,比如数据库连接用了11s,再执行释放锁会发生什么?
如下图,当操作超时后应用A的锁已经被自动释放了,最后还是会在finally代码块中执行释放锁的操作,但这把锁已经被另一个应用获取了,就会出现锁误删的情况。
加上UUID保证锁的唯一性
解铃还须系铃人!为了防止应用把别人的锁误删,我们还需要给每一把锁加上标识,在释放锁之前需要判断这把锁是否是自己的锁。如果是自己的锁才释放。这个问题很简单,只需要把锁的value设一个uuid,在finally代码块中对比value就可以了。
/**
* redis分布式锁
*/
@Override
public void deduct(){
// 先拿到分布式锁
String uuid= UUID.randomUUID().toString();
while(!redisTemplate.opsForValue().setIfAbsent("lock", uuid,10,TimeUnit.SECONDS)){
// 线程随眠50毫秒
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 如果上锁成功就扣减库存
try{
// 获取库存并-1
String stock = redisTemplate.opsForValue().get("stock");
if(!StringUtils.isEmpty(stock)){
Integer count = Integer.valueOf(stock);
if(count > 0){
redisTemplate.opsForValue().set("stock",String.valueOf(count-1));
}
}
}finally {
if(uuid.equals(redisTemplate.opsForValue().get("lock"))){
redisTemplate.delete("lock");
}
}
}
lua脚本保证删锁原子性
上面的代码看似天衣无缝了,但还是存在问题。假如系统执行到if(uuid.equals(redisTemplate.opsForValue().get(“lock”)))
判断为true正准备释放锁时锁过期了,应用B上锁,还是会出现误删的情况。
所以删锁操作必须是原子性的。我们可以通过lua脚本来实现原子性删锁。
/**
* redis分布式锁
*/
@Override
public void deduct(){
// 先拿到分布式锁
String uuid= UUID.randomUUID().toString();
while(!redisTemplate.opsForValue().setIfAbsent("lock", uuid,10,TimeUnit.SECONDS)){
// 线程随眠50毫秒
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 如果上锁成功就扣减库存
try{
// 获取库存并-1
String stock = redisTemplate.opsForValue().get("stock");
if(!StringUtils.isEmpty(stock)){
Integer count = Integer.valueOf(stock);
if(count > 0){
redisTemplate.opsForValue().set("stock",String.valueOf(count-1));
}
}
}finally {
// 原子性解锁
String script = "if redis.call('get',KEYS[1]) == ARGV[1] " +
" then " +
" return redis.call('del',KEYS[1])" +
" else " +
" return 0" +
" end";
Boolean lock = redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList("lock"), arg);
}
}
}
最后
以上就是淡然鸡翅为你收集整理的redis实现分布式锁详解的全部内容,希望文章能够帮你解决redis实现分布式锁详解所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复