概述
前言
大家好,我是飓风
上一遍文章 分布式锁的实现- mysql,我们讲解了分布式锁实现的特性,主要包含:
- 互斥性
- 超时特性
- 提供阻塞和非阻塞接口
- 可重入性
- 公平锁和非公平锁
- 其他 高性能 高可用等
今天咱们来看看用zookeeper 怎么来实现分布式锁。
实现
环境准备
- zookeeper 3.4.11
- jdk 8
- spring-boot 2.3.2.RELEASE
- curator-framework 4.0.0
jdk 安装
这里就不介绍jdk安装了,相信大家肯定google 或者百度都可以查到,很简单,略过。
zookeeper 安装
这里我们利用docker 来快速安装和启动
安装:
docker pull zookeeper:3.4.11
启动:
docker run --name zookeeper --restart always -d zookeeper:3.4.11
创建maven 项目
这里创建maven 项目就省略了,下面的maven的依赖配置,具体的版本号在我的父pom里,完成代理,我会传到github 上。
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>org.slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
</dependency>
</dependencies>
实现原理
zookeeper 的特性
- 结构简单类,似于文件系统的树状结构
- 单系统镜像,无论客户端连接到哪一个服务器,他将看到相同的、Zookeeper视图
- 有序性, 有序的事务编号,客户端的更新顺序与它们被发送的顺序相一致
- 原子性, 更新操作要么成功要么失败,没有第三种结果
zookeeper的节点
- 每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识
- znode有三种类型,临时的( EPHEMERAL )、持久的( PERSISTENT )和有序的 (SEQUENTIAL)
- znode的类型在创建时确定并且之后不能再修改znode可以包含数据和子节点,但是EPHEMERAL类型的节点不能有子节点
- znode中的数据可以有多个版本,比如某一个路径下存有多个数据版本,那么查询这个路径下的数据就需要带上版本
- 节点不支持部分读写,而是一次性完整读写
- 短暂znode的客户端会话结束时,zookeeper会将该短暂znode删除,短暂znode不可以有子节点
- 持久znode不依赖于客户端会话,只有当客户端明确要删除该持久znode时才会被删除
- 客户端应用可以在节点上设置监视器
zookeeper watcher
Watcher 在 zookeeper 是一个核心功能,Watcher 可以监控目录节点的数据变化以及子目录的变化,一旦这些状态发生变化,服务器就会通知所有设置在这个目录节点上的 Watcher,从而每个客户端都很快知道它所关注的目录节点的状态发生变化,而做出相应的反应。
公共创建相同的临时节点方式
通过上面的zookeeper的介绍,我们知道,zookeeper 是读写是原子性的,且节点不能重复创建的,那么我们就让客户端,在获取分布式锁的时候,去创建这个临时节点,先创建的client ,那么就会返回创建成功,其他都会返回创建失败,其他创建失败,此时需要监听这个 临时节点,如果临时节点被删除了,那么说明就是释放锁了,其他client 可以接着创建这个临时节点,来争抢分布式锁。
之所以会利用临时节点,如果程序down掉了,那么此时临时节点就会自动删除,不会出现死锁的现象。
举例:比如我们进行某个SKU库存的扣减,那么此时zookeeper 的创建节点的路径咱们可以设置为 /lock/sku/100121212
, 其中 100121212
就是要扣减的sku的值,也是一个临时节点。
下面我画个图,看完相信会更加清晰。
1.创建临时节点,也就是开始获取锁,如下图:
2.client1 获取锁成功,如下图:
3.此时client1 获取锁成功,其他client2 和client3 获取锁失败
接着client2 和 client3 开始监听 这个临时节点是否被删除了,如下图:
4.client1 执行完扣减库存业务,那么就会删除临时节点,也就是释放锁,那么其他client2 和client3 监听到这个临时节点被删除了,那么就会再次进行锁的获取,也就是创建这个临时节点了。
通过上面这个几个步骤,一个基于zookeeper临时节点的分布式锁就实现了。但是这里有些问题需要说明下:
当大量客户端去竞争锁的时候,会发生“惊群”效应,这里惊群效应指的是在分布式锁竞争的过程中,大量的"Watcher通知"和“创建/lock/sku/xxxx”两个操作重复运行,并且绝大多数运行结果都创建节点失败,从而继续等待下一次通知,若在集群规模较大的情况下,会对ZooKeeper服务器以及客户端服务器造成巨大的性能影响和网络冲击,所以基于这种方式的实现,并发量上支持不很高,大流量下不建议使用。
下面我来介绍改进方案
基于zookeeper的临时顺序节点方式
临时顺序节点原理
我们可以利用创建zookeeper的临时顺序节点的方式,来解决“惊群”效应,其实是一种公平锁的实现,下面说下具体的步骤:
- 使用 zookeeper 的临时节点和有序节点,每个线程获取锁就是在 ZK 创建一个临时有序的节点,比如在 /lock/sku/000001, /lock/sku/000002, /lock/ sku/000003
其中sku 是要你进行写的公共资源。
如下图所示:三个client 同时想进行sku= 100121212 进行扣钱库存,那么sku = 100121212 就是共享资源,需要进行加锁,三个client 就会去创建临时顺序节点,
/lock/100121212,分别创建了 /lock/10012121/001,/lock/10012121/002 ,/lock/10012121/003
- 创建节点成功后,获取 /lock/sku 目录下的所有临时节点,再判断当前线程创建的节点是否是所有的节点的序号最小的节点,如果当前线程创建的节点是所有节点序号最小的节点,则认为获取锁成功。
- 如果当前线程创建的节点不是所有节点序号最小的节点,则对节点序号的前一个节点添加一个事件监听。如下图所示:
- 前一个节点被删除了,那么就会被监听到,此时又会获取临时顺序节点的集合,看自己是不是最小的,如果是,那么就获到了,如果不是继续进行监听。
如下图所示,001 被删除了,那么client2 就会监听到001 被删除了,于是再次获取到子节点集合,判断自己已经示最小的节点了,那么获取锁成功了。
临时顺序节点代码
这里代码不做过多解释了,给了主要类的实现说明,可以和上面的原理对应上的。
1 实现了阻塞获取锁
2 实现了非阻塞获取锁
3 锁的可重入性
lock 接口 ,定义要实现获取锁和释放锁的方法
public interface Lock {
/**
* 阻塞获取锁
* @return
*/
void lock(String source) throws LockException;
/**
* 非阻塞获取锁
* @return
*/
boolean nonLock(String source,int retries);
/**
* 释放锁
*/
boolean unLock();
}
lock 接口的实现
@Slf4j
@RequiredArgsConstructor
@Component
@Scope(value = "prototype")
public class ZookeeperLock implements Lock {
private final CuratorFramework curatorFramework;
private final Watcher watcher = event -> {
if (event.getType() == Watcher.Event.EventType.NodeDeleted){
notifyAllFromWatcher();
}
};
/**
* key: lock path
* value: 重入的次数
*/
private static final TransmittableThreadLocal<LockInfo> THREAD_LOCAL = new TransmittableThreadLocal<>();
private LockInfo getLocalMap(){
LockInfo lockInfo = THREAD_LOCAL.get();
if (lockInfo == null){
lockInfo = new LockInfo();
THREAD_LOCAL.set(lockInfo);
}
return lockInfo;
}
private String createLockPath(String source) {
String base = "/" + source;
// 创建临时节点,这里肯定谁最小是谁先创建出来
String currentPath = null;
try {
currentPath = curatorFramework.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(base+"/lock_");
} catch (Exception e) {
e.printStackTrace();
}
return currentPath;
}
private String getLockPath(){
final LockInfo lockInfo = getLocalMap();
return lockInfo.lockPath;
}
private String getBasePath(){
final LockInfo lockInfo = getLocalMap();
return "/"+lockInfo.source;
}
private String getSource(){
final LockInfo lockInfo = getLocalMap();
return lockInfo.source;
}
private void setLock(){
final LockInfo lockInfo = getLocalMap();
lockInfo.lock = true;
}
private void incCount(){
final LockInfo lockInfo = getLocalMap();
lockInfo.count++;
}
private void deCount(){
final LockInfo lockInfo = getLocalMap();
lockInfo.count--;
}
private int getCount(){
return getLocalMap().count;
}
private boolean isLock(){
return getLocalMap().lock;
}
private synchronized void notifyAllFromWatcher(){
notifyAll();
}
private void nextLock() throws LockException {
boolean deleted = false;
try {
// 不相等,那么说明有比它大的,那么找出它的弟弟节点,进行监听
//监听上一个节点
final List<String> childrenPath = curatorFramework.getChildren().forPath(getBasePath());
final String youngerBrother = CommonUtil.getYoungerBrother(getSource(), childrenPath, getLockPath());
//如果为空个,说明就剩下她自己一个了,那么直接返回获取
if (StringUtils.isEmpty(youngerBrother)){
log.error("currentThread=> "+Thread.currentThread().getId()+"'s youngerBrother is null ");
lock(getSource());
return;
}
curatorFramework.getData().usingWatcher(watcher).forPath(youngerBrother);
synchronized(this){
wait();
}
lock(getSource());
}catch (Exception e){
//如果是 NoNodeException ,说明我监听的节点不存了,那么如要继续获取锁
if(e instanceof KeeperException.NoNodeException){
lock(getSource());
return;
}
e.printStackTrace();
deleted = true;
throw new LockException("获取锁失败:" + e.getMessage());
}finally {
// 等待超时,没有获取到锁,那么删除zookeeper 中的临时节点和thread local内的数据
if (deleted){
removeResource();
throw new LockException("超时-获取锁失败");
}
}
}
private void initCurrentLock(String source) throws LockException {
final LockInfo lockInfo = getLocalMap();
//如果为空,那么说第一次尝试获取锁
if (StringUtils.isEmpty(lockInfo.lockPath)){
String lockPath = createLockPath(source);
if (StringUtils.isEmpty(lockPath)){
throw new LockException("创建锁失败,请稍后重试");
}
lockInfo.source = source;
lockInfo.lockPath = lockPath;
}
}
@Override
public void lock(String source) throws LockException {
initCurrentLock(source);
//如果获得了,那么不要继续了
if (lockResource()){
return;
}
try {
//阻塞 监听
nextLock();
} catch (Exception e) {
e.printStackTrace();
throw new LockException("获取锁失败,请稍后重试");
}
}
@Override
public boolean nonLock(String source,int retries){
boolean notLock = false;
try {
initCurrentLock(source);
while (retries>0){
if (lockResource()){
return true;
}
retries--;
}
if (retries==0){
notLock = true;
}
} catch (LockException e) {
e.printStackTrace();
log.error("上锁失败: {} ",e.getMessage());
notLock = true;
return false;
}finally {
//如果出现异常了,那么一定要删除
if (notLock){
removeResource();
}
}
return !notLock;
}
@Override
public boolean unLock() {
if (getCount()>1){
deCount();
return true;
}
return removeResource();
}
private boolean removeResource(){
try {
String lockPath = getLockPath();
THREAD_LOCAL.remove();
if (!StringUtils.isEmpty(lockPath) && curatorFramework.checkExists().forPath(lockPath)!=null){
curatorFramework.delete().forPath(lockPath);
}
} catch (Exception e) {
e.printStackTrace();
//todo: 如果删除锁失败了,那么要记录日志,同时报警,进行人工干预
return false;
}
return true;
}
private boolean lockResource() throws LockException {
//判断是否重入了
if (isLock()){
incCount();
return true;
}
String lockPath = getLockPath();
String basePath = getBasePath();
int currentNumber = CommonUtil.getNumber(lockPath);
List<String> childrenPath;
try {
childrenPath = curatorFramework.getChildren().forPath(basePath);
} catch (Exception e) {
e.printStackTrace();
throw new LockException("获取锁列表失败");
}
// 获取所有节点的最小节点数字
int minNumber = CommonUtil.getMin(childrenPath);
//如果相等,那么它就是最小的,获得锁
if (currentNumber == minNumber){
System.out.println("lock thread: " + Thread.currentThread().getId() +" , lock number: " + currentNumber);
setLock();
incCount();
return true;
}
return false;
}
@Data
public static class LockInfo{
private String source;
private String lockPath;
private int count;
private boolean lock = false;
}
}
源码地址
具体源码地址: github 点击
总结
zookeeper分布式锁的实现方式
- 共同创建临时节点的方式,会引起“惊群”效应,并发量不能太高
- 临时顺序接的方式,只会监听上一个顺序节点,性能会很高。
最后
以上就是细心蚂蚁为你收集整理的分布式锁的实现- zookeeper的全部内容,希望文章能够帮你解决分布式锁的实现- zookeeper所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复