概述
手写代码模拟zookeeper分布式锁
- 1.zookeeper服务器
- 2.代码模拟分布式锁
- 2.1 maven工程
- 2.2 编写分布式锁实现类
- 2.3 编写测试类
- 2.4 执行效果
- 3.小结
1.zookeeper服务器
安装并准备zookeeper服务器
2.代码模拟分布式锁
2.1 maven工程
新建maven工程,导入zookeeper依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
</dependency>
2.2 编写分布式锁实现类
- 定义类成员变量
/**
- 分布式锁,继承Lock接口,并实现
-
- @author fenghe
*/
public class DistributedLock implements Lock {
/**
* 保存一个zookeeper的实例,用于连接zookeeper
*/
private ZooKeeper zooKeeper;
/**
* 默认根节点,持久化的节点
*/
private final String ROOT_LOCK_DIR = "/locks";
/**
* 尝试获得锁时,新建的节点路径
*/
private String CURRENTLOCK = null;
/**
* 未获得锁时,需要等待释放的节点路径
*/
private String WAITLOCK = null;
/**
* 用于处理等待节点的通知
*/
private CountDownLatch latch;
}
- 实现构造函数
/**
* 构造函数,传入zookeeper连接参数
* @param connectString
*/
public DistributedLock(String connectString) {
final CountDownLatch countDownLatch = new CountDownLatch(1);
try {
//1.新建zookeeper连接
zooKeeper = new ZooKeeper(connectString, 4000, new Watcher() {
public void process(WatchedEvent event) {
//2.连接成功后,唤醒countDownLatch
if (KeeperState.SyncConnected.equals(event.getState())) {
countDownLatch.countDown();
}
}
});
//3.等待连接成功后被唤醒
countDownLatch.await();
//4.判断ROOT_LOCK_DIR节点是否存在
Stat stat = zooKeeper.exists(ROOT_LOCK_DIR, false);
if (stat == null) {
//5.如果不存在就新建持久化节点
//PS:这里会有异步问题,最好保证这个节点在启动前已经创建,可以手动创建一个
zooKeeper.create(ROOT_LOCK_DIR, "0".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e1) {
e1.printStackTrace();
}
}
- 实现tryLock()
public boolean tryLock() {
try {
//1.创建一个临时有序节点
CURRENTLOCK = zooKeeper.create(ROOT_LOCK_DIR+"/", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName()+" ==> " + CURRENTLOCK + " 尝试获得分布式锁...");
//2.开始判断是否获得锁,即判断自己创建的节点序号是否最小
List<String> nodes = zooKeeper.getChildren(ROOT_LOCK_DIR, false);
SortedSet<String> set = new TreeSet<String>();
nodes.forEach((node)->{
set.add(ROOT_LOCK_DIR+"/"+node);
});
String first = set.first();
//3.如果最小序号的节点就是自己创建的节点,则表示自己获得了锁
if(CURRENTLOCK.equals(first)){
return true;
}
//4.否则获得当前所有节点中序号比自己小的节点
SortedSet<String> less = set.headSet(CURRENTLOCK);
if(!less.isEmpty()){
//5.获得所有比自己小的节点中最大的一个,设置为等待锁,即这个节点删除时,自己就可以获得锁
WAITLOCK = less.last();
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
- 实现lock()
public void lock() {
//1.trylock返回true则表示获得锁
if(this.tryLock()){
System.out.println(Thread.currentThread().getName() + "==>" + CURRENTLOCK + " 获得锁...");
}else{
//2.反之则需要等待WAITLOCK节点释放
try {
//3.注册WAITLOCK的Watcher,当该节点删除时,唤醒latch
Stat stat = zooKeeper.exists(WAITLOCK, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(latch!=null){
latch.countDown();
}
}
});
if(stat!=null){
System.out.println(Thread.currentThread().getName() + "==>等待锁==>" + WAITLOCK + " 释放...");
//4. 等待WAITLOCK删除后被唤醒
latch = new CountDownLatch(1);
latch.await();
//5. 唤醒即表示成功获得锁
System.out.println(Thread.currentThread().getName() + "==>" + CURRENTLOCK + " 获得锁...");
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 实现unlock()
public void unlock() {
try {
System.out.println(Thread.currentThread().getName()+" ==> " + CURRENTLOCK + " 释放锁...");
//1.删除临时节点
zooKeeper.delete(CURRENTLOCK, -1);
CURRENTLOCK = null;
//2.关闭连接
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
2.3 编写测试类
public static void main(String[] args) throws IOException {
CountDownLatch countDownLatch = new CountDownLatch(5);
//1.模拟5个客户端通知竞争分布式锁
for(int i=0;i<5;i++){
new Thread(new Runnable() {
public void run() {
try {
countDownLatch.await();
DistributedLock distributedLock = new DistributedLock("127.0.0.1:2181");
//2.新建分布式锁,并请求锁定
distributedLock.lock();
//3.模拟业务逻辑,在2秒内随机睡眠
Thread.sleep(Math.round(2000));
//4.释放锁
distributedLock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"Thread"+i).start();
countDownLatch.countDown();
}
System.in.read();
}
2.4 执行效果
2019-04-02 00:37:01,470 Thread1 ==> /locks/0000000000 尝试获得分布式锁...
2019-04-02 00:37:01,470 Thread0 ==> /locks/0000000002 尝试获得分布式锁...
2019-04-02 00:37:01,471 Thread2 ==> /locks/0000000003 尝试获得分布式锁...
2019-04-02 00:37:01,471 Thread3 ==> /locks/0000000001 尝试获得分布式锁...
2019-04-02 00:37:01,475 Thread1==>/locks/0000000000 获得锁...
2019-04-02 00:37:01,476 Thread4 ==> /locks/0000000004 尝试获得分布式锁...
2019-04-02 00:37:01,477 Thread0==>等待锁==>/locks/0000000001 释放...
2019-04-02 00:37:01,477 Thread3==>等待锁==>/locks/0000000000 释放...
2019-04-02 00:37:01,477 Thread2==>等待锁==>/locks/0000000002 释放...
2019-04-02 00:37:01,478 Thread4==>等待锁==>/locks/0000000003 释放...
2019-04-02 00:37:03,480 Thread1 ==> /locks/0000000000 释放锁...
2019-04-02 00:37:03,485 Thread3==>/locks/0000000001 获得锁...
2019-04-02 00:37:03,485 Session: 0x100002084d00074 closed
2019-04-02 00:37:03,487 EventThread shut down for session: 0x100002084d00074
2019-04-02 00:37:05,490 Thread3 ==> /locks/0000000001 释放锁...
2019-04-02 00:37:05,493 Thread0==>/locks/0000000002 获得锁...
2019-04-02 00:37:05,494 Session: 0x100002084d00072 closed
2019-04-02 00:37:05,494 EventThread shut down for session: 0x100002084d00072
2019-04-02 00:37:07,497 Thread0 ==> /locks/0000000002 释放锁...
2019-04-02 00:37:07,500 Thread2==>/locks/0000000003 获得锁...
2019-04-02 00:37:07,502 Session: 0x100002084d00070 closed
2019-04-02 00:37:07,502 EventThread shut down for session: 0x100002084d00070
2019-04-02 00:37:09,501 Thread2 ==> /locks/0000000003 释放锁...
2019-04-02 00:37:09,504 Thread4==>/locks/0000000004 获得锁...
2019-04-02 00:37:09,505 Session: 0x100002084d00073 closed
2019-04-02 00:37:09,505 EventThread shut down for session: 0x100002084d00073
2019-04-02 00:37:11,506 Thread4 ==> /locks/0000000004 释放锁...
2019-04-02 00:37:11,510 Session: 0x100002084d00071 closed
2019-04-02 00:37:11,510 EventThread shut down for session: 0x100002084d00071
可以看到第一次Thread1获得了锁,其临时节点为/locks/0000000000
,而Thread3正在等待/locks/0000000000
节点的释放;00:37:03时,Thread1释放了锁,即删除了/locks/0000000000
节点,之后Thread3马上就获得了锁,以此类推,之后的线程也在前一个节点删除后,显示获得锁成功。
3.小结
通过代码模拟分布式锁的实现,帮助理解分布式锁的概念,及其设计思路。
- 通过zookeeper临时有序节点的原理实现分布式锁
- 根据节点序号判断是否获得锁
- 利用watcher机制,链式监听前一个节点来感知是否删除,并确定自身锁的获得
- 临时节点的特性能够保证获得锁的客户端即使异常中断,也不影响后续锁的获得
- Curator封装了各种zookeeper的api,包括分布式锁的实现(上面的代码只是示意)
最后
以上就是隐形蓝天为你收集整理的手写代码模拟zookeeper分布式锁的全部内容,希望文章能够帮你解决手写代码模拟zookeeper分布式锁所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复