我是靠谱客的博主 土豪战斗机,这篇文章主要介绍分布式锁-zk临时节点,现在分享给大家,希望可以做个参考。

多线程访问同一个共享资源时,会出现并发问题,synchronized或者lock 类的锁只能控制单一进程的资源访问,多进程下就需要用到分布式锁

利用zk 可以实现独占锁,(同级节点唯一性)多个进程往zk指定节点下创建一个相同名称的节点,只有一个能成功,创建失败的通过zk的watcher机制监听子节点变化,一个监听到子节点删除事件,会再次触发所有进程的写锁,但这里会有惊群效应,会影响到性能

利用有序节点实现分布式锁:每个客户端都往一个指定节点(locks)注册一个临时有序节点,越早创建的节点编号越小,最小编号的节点获得锁,通过监听比自己小的节点,当比自己小的节点删除后,客户端会收到watcher,再次判断自己的节点是不是所有节点最小的,是则获得锁;这种方式也会解决惊群问题

接下来我们来看实现:curator 分布式锁的使用
curator对锁封装了一层,提供了InterProcessMutex;还提供了leader 选举、分布式队列 InterProcessMutex 分布式可重入排他锁
InterProcessSemaphoreMutex 分布式排他锁
InterProcessReadWriteLock 分布式读写锁

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
public class LeaderSelectorClient extends LeaderSelectorListenerAdapter implements Closeable { private String name; //表示当前的进程 private LeaderSelector leaderSelector; //leader选举的API private CountDownLatch countDownLatch=new CountDownLatch(1); public LeaderSelectorClient(){ } public LeaderSelectorClient(String name) { this.name = name; } public LeaderSelector getLeaderSelector() { return leaderSelector; } public void setLeaderSelector(LeaderSelector leaderSelector) { this.leaderSelector = leaderSelector; } public void start(){ leaderSelector.start(); //开始竞争leader } @Override public void takeLeadership(CuratorFramework client) throws Exception { //如果进入当前的方法,意味着当前的进程获得了锁。获得锁以后,这个方法会被回调 //这个方法执行结束之后,表示释放leader权限 System.out.println(name+"->现在是leader了"); // countDownLatch.await(); //阻塞当前的进程防止leader丢失 } @Override public void close() throws IOException { leaderSelector.close(); } private static String CONNECTION_STR="zk集群地址,至少三台机器"; public static void main(String[] args) throws IOException { CuratorFramework curatorFramework = CuratorFrameworkFactory.builder(). connectString(CONNECTION_STR).sessionTimeoutMs(50000000). retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); curatorFramework.start(); LeaderSelectorClient leaderSelectorClient=new LeaderSelectorClient("ClientA"); LeaderSelector leaderSelector=new LeaderSelector(curatorFramework,"/leader",leaderSelectorClient); leaderSelectorClient.setLeaderSelector(leaderSelector); leaderSelectorClient.start(); //开始选举 System.in.read(); } } 我们来看下curator 实现分布式锁的原理,这里我把注释写在了代码中,所以把代码贴到一块 public InterProcessMutex(CuratorFramework client, String path) { // 实现公平锁的核心:zookeeper利用path 创建临时有序节点 this(client, path, new StandardLockInternalsDriver()); } public StandardLockInternalsDriver() { } public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) { this(client, path, "lock-", 1, driver); } //maxLeases:互斥锁 InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) { this.threadData = Maps.newConcurrentMap(); this.basePath = PathUtils.validatePath(path); //InterProcessMutex 把分布式锁的申请和释放委托给了 LockInternals internals this.internals = new LockInternals(client, driver, path, lockName, maxLeases); } // 无限等待 public void acquire() throws Exception { if (!this.internalLock(-1L, (TimeUnit)null)) { throw new IOException("Lost connection while trying to acquire lock: " + this.basePath); } } // 限时等待 public boolean acquire(long time, TimeUnit unit) throws Exception { return this.internalLock(time, unit); } private boolean internalLock(long time, TimeUnit unit) throws Exception { Thread currentThread = Thread.currentThread(); //同一线程再次acquire,首先判断threaData 是否有这个线程锁信息,如果有则原子+1,然后返回 InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread); if (lockData != null) { // 实现可重入; lockData.lockCount.incrementAndGet(); return true; // 映射表没有对应的锁信息,尝试通过LockInternals 获取锁 } else { String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes()); if (lockPath != null) { // 成功获取锁,存储到映射表中 InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath); this.threadData.put(currentThread, newLockData); return true; } else { return false; } } } // 记录线程和锁信息的映射关系 private final ConcurrentMap<Thread, InterProcessMutex.LockData> threadData; //zk 中一个临时有序节点对应一个锁,但是让锁生效需要排队 private static class LockData { final Thread owningThread; final String lockPath; final AtomicInteger lockCount; private LockData(Thread owningThread, String lockPath) { this.lockCount = new AtomicInteger(1); //分布式锁重入次数 this.owningThread = owningThread; this.lockPath = lockPath; } } //尝试获取锁,并返回锁对应的zk 临时有序节点路径 String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { long startMillis = System.currentTimeMillis(); // millisToWait 是个null Long millisToWait = unit != null ? unit.toMillis(time) : null; byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes; int retryCount = 0; String ourPath = null; //是否已经有分布式锁? boolean hasTheLock = false; //是否已经完成尝试获取分布式锁操作 boolean isDone = false; while(!isDone) { isDone = true; try { //driver = StandardLockInternalsDriver ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes); //循环等待激活分布式锁 hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath); } catch (NoNodeException var14) { if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) { throw var14; } isDone = false; } } //成功获取分布式锁,返回有序节点的路径 return hasTheLock ? ourPath : null; } //在zk 中创建临时顺序节点 public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception { String ourPath; //默认内容是ip地址 if (lockNodeBytes != null) { // creatingParentContainersIfNeeded:创建父节点,如果不支持CreateMode.CONTAINER,就采用CreateMode.PERSISTENT // withProtection:临时节点添加GUID ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path, lockNodeBytes); } else { ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path); } return ourPath; } //循环等待激活分布式锁 private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception { boolean haveTheLock = false; boolean doDelete = false; try { if (this.revocable.get() != null) { ((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath); } while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) { List<String> children = this.getSortedChildren(); String sequenceNodeName = ourPath.substring(this.basePath.length() + 1); PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases); if (predicateResults.getsTheLock()) { haveTheLock = true; } else { String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch(); synchronized(this) { try { ((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath); if (millisToWait == null) { this.wait(); } else { millisToWait = millisToWait - (System.currentTimeMillis() - startMillis); startMillis = System.currentTimeMillis(); if (millisToWait > 0L) { this.wait(millisToWait); } else { doDelete = true; break; } } } catch (NoNodeException var19) { } } } } } catch (Exception var21) { ThreadUtils.checkInterrupted(var21); doDelete = true; throw var21; } finally { if (doDelete) { this.deleteOurPath(ourPath); } } return haveTheLock; }

最后

以上就是土豪战斗机最近收集整理的关于分布式锁-zk临时节点的全部内容,更多相关分布式锁-zk临时节点内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(65)

评论列表共有 0 条评论

立即
投稿
返回
顶部