分布式锁
互联网初期,我们系统一般都是单点部署,也就是在一台服务器完成系统的部署,后期随着用户量的增加,服务器的压力也越来越大,响应速度越来越慢,甚至出现服务器崩溃的情况。
为解决服务器压力太大,响应慢的特点,分布式系统部署出现了。
简单的说,就是我们将系统资源部署到多台服务器中,然后使用一台服务器做入口代理,根据一些决策将接收到的请求转发到资源服务器,这也就是我们常说的 反向代理(一般就是使用nginx)
虽然分布式解决了服务器压力的问题,但也带来了新的问题。
比如,我们有一个下单统计的功能,当完成下单后,需要执行统计功能,而在高访问的情况下,可能有两个下单请求(A和B)同时完成,然后一起执行了统计功能,这样可能导致的结果就是A请求未将B请求数据统计在内,而B请求可能也未将A请求数据统计在内,这样就造成了数据的统计错,这个问题的产生的根本原因就是统计功能的并发导致的,如果是单点部署的系统,我们简单的使用一个锁操作就能完成了,但是在分布式环境下,A和B请求可能同时运行在两个服务器中,普通的锁就不能起到效果了,这个时候就要使用分布式锁了。
Zookeeper分布式锁原理
分布式锁的实现发放有多种,简单的,我们可以使用数据库表去实现它,也可以使用redis去实现它,这里要使用的Zookeeper去实现分布式锁
Zookeeper分布式锁的原理是巧妙的是使用了znode临时节点的特点和监听(watcher)机制,监听机制很简单,就是我们可以给znode添加一个监听器,当znode节点状态发生改变时(如:数据内容改变,节点被删除),会通知到监听器。
前面几节介绍过znode有三种类型
PERSISTENT:持久节点,即使在创建该特定znode的客户端断开连接后,持久节点仍然存在。默认情况下,除非另有说明,否则所有znode都是持久的。
EPHEMERAL:临时节点,客户端是连接状态时,临时节点就是有效的。当客户端与ZooKeeper集合断开连接时,临时节点会自动删除。临时节点不允许有子节点。临时节点在leader选举中起着重要作用。
SEQUENTIAL:顺序节点,可以是持久的或临时的。当一个新的znode被创建为一个顺序节点时,ZooKeeper通过将10位的序列号附加到原始名称来设置znode的路径,顺序节点在锁定和同步中起重要作用。
其中,顺序节点,可以是持久的或临时的,而临时节点有个特点,就是它属于创建它的那个会话,当会话断开,临时节点就会自动删除,如果在临时节点上注册了监听器,那么监听器就会收到通知,如果临时节点有了时间顺序,那我们为实现分布式锁就又有一个想法:
假如在Zookeeper中有一个znode节点/Locker
1、当client1连接Zookeeper时,先判断/Locker节点是否存在子节点,如果没有子节点,那么会在/Locker节点下创建一个临时顺序的znode节点,假如是/client1,表示client1获取了锁状态,client1可以继续执行。
2、当client2连接Zookeeper时,先判断/Locker节点是否存在子节点,发现已经存在子节点了,然后获取/Locker下的所有子节点,同时按时间顺序排序,在最后一个节点,也就是/client1节点上注册一个监听器(watcher1),同时在/Locker节点下创建一个临时顺序的znode节点,假如是/client2。同时client2将被阻塞,而阻塞状态的释放是在监听器(watcher1)中的。
3、当client3连接Zookeeper时,先判断/Locker节点是否存在子节点,发现已经存在子节点了,然后获取/Locker下的所有子节点,同时按时间顺序排序,在最后一个节点,也就是/client2节点上注册一个监听器(watcher2),同时在/Locker节点下创建一个临时顺序的znode节点,假如是/client3。同时client2将被阻塞,而阻塞状态的释放是在监听器(watcher2)中的。
以此类推。
4、当client1执行完操作了,断开Zookeeper的连接,因为/client1是临时顺序节点,于是将会自动删除,而client2已经往/client1节点中注册了一个监听器(watcher1),于是watcher1将会受到通知,而watcher1又会释放client2的阻塞状态。于是client2获取锁状态,继续执行。
5、当client2执行完操作了,断开Zookeeper的连接,因为/client2是临时顺序节点,于是将会自动删除,而client3已经往/client2节点中注册了一个监听器(watcher2),于是watcher2将会受到通知,而watcher2又会释放client3的阻塞状态。于是client3获取锁状态,继续执行。
以此类推。
这样,不管分布式环境中有几台服务器,都可以保证程序的排队似的执行了。
C#实现Zookeeper分布式锁
上一节有封装过一个ZookeeperHelper的辅助类(Zookeeper基础教程(四):C#连接使用Zookeeper),使用这个辅助类实现了一个ZookeeperLocker类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace AspNetCore.ZookeeperConsole { /// <summary> /// 基于Zookeeper的分布式锁 /// </summary> public class ZookeeperLocker : IDisposable { /// <summary> /// 单点锁 /// </summary> static object locker = new object(); /// <summary> /// Zookeeper集群地址 /// </summary> string[] address; /// <summary> /// Zookeeper操作辅助类 /// </summary> ZookeeperHelper zookeeperHelper; /// <summary> /// 构造函数 /// </summary> /// <param name="lockerPath">分布式锁的根路径</param> /// <param name="address">集群地址</param> public ZookeeperLocker(string lockerPath, params string[] address) : this(lockerPath, 0, address) { } /// <summary> /// 构造函数 /// </summary> /// <param name="lockerPath">分布式锁的根路径</param> /// <param name="sessionTimeout">回话过期时间</param> /// <param name="address">集群地址</param> public ZookeeperLocker(string lockerPath, int sessionTimeout, params string[] address) { this.address = address.ToArray(); zookeeperHelper = new ZookeeperHelper(address, lockerPath); if (sessionTimeout > 0) { zookeeperHelper.SessionTimeout = sessionTimeout; } if (!zookeeperHelper.Connect()) { throw new Exception("connect failed:" + string.Join(",", address)); } lock (locker) { if (!zookeeperHelper.Exists())//根节点不存在则创建 { zookeeperHelper.SetData("", "", true); } } } /// <summary> /// 生成一个锁 /// </summary> /// <returns>返回锁名</returns> public string CreateLock() { var path = Guid.NewGuid().ToString().Replace("-", ""); while (zookeeperHelper.Exists(path)) { path = Guid.NewGuid().ToString().Replace("-", ""); } return CreateLock(path); } /// <summary> /// 使用指定的路径名称设置锁 /// </summary> /// <param name="path">锁名,不能包含路径分隔符(/)</param> /// <returns>返回锁名</returns> public string CreateLock(string path) { if (path.Contains("/")) { throw new ArgumentException("invalid path"); } return zookeeperHelper.SetData(path, "", false, true); } /// <summary> /// 获取锁 /// </summary> /// <param name="path">锁名</param> /// <returns>如果获得锁返回true,否则一直等待</returns> public bool Lock(string path) { return LockAsync(path).GetAwaiter().GetResult(); } /// <summary> /// 获取锁 /// </summary> /// <param name="path">锁名</param> /// <param name="millisecondsTimeout">超时时间,单位:毫秒</param> /// <returns>如果获得锁返回true,否则等待指定时间后返回false</returns> public bool Lock(string path, int millisecondsTimeout) { return LockAsync(path, millisecondsTimeout).GetAwaiter().GetResult(); } /// <summary> /// 异步获取锁等等 /// </summary> /// <param name="path">锁名</param> /// <returns>如果获得锁返回true,否则一直等待</returns> public async Task<bool> LockAsync(string path) { return await LockAsync(path, System.Threading.Timeout.Infinite); } /// <summary> /// 异步获取锁等等 /// </summary> /// <param name="path">锁名</param> /// <param name="millisecondsTimeout">超时时间,单位:毫秒</param> /// <returns>如果获得锁返回true,否则等待指定时间后返回false</returns> public async Task<bool> LockAsync(string path, int millisecondsTimeout) { var array = await zookeeperHelper.GetChildrenAsync("", true); if (array != null && array.Length > 0) { var first = array.FirstOrDefault(); if (first == path)//正好是优先级最高的,则获得锁 { return true; } var index = array.ToList().IndexOf(path); if (index > 0) { //否则添加监听 var are = new AutoResetEvent(false); var watcher = new NodeWatcher(); watcher.NodeDeleted += (ze) => { are.Set(); }; if (await zookeeperHelper.WatchAsync(array[index - 1], watcher))//监听顺序节点中的前一个节点 { if (!are.WaitOne(millisecondsTimeout)) { return false; } } are.Dispose(); } else { throw new InvalidOperationException($"no locker found in path:{zookeeperHelper.CurrentPath}"); } } return true; } /// <summary> /// 释放资源 /// </summary> public void Dispose() { zookeeperHelper.Dispose(); } } }
现在写个程序可以模拟一下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46using System; using System.Collections.Generic; using System.Text; using System.Threading; namespace AspNetCore.ZookeeperConsole { class Program { static void Main(string[] args) { //Zookeeper连接字符串,采用host:port格式,多个地址之间使用逗号(,)隔开 string[] address = new string[] { "192.168.209.133:2181", "192.168.209.133:2181", "192.168.209.133:2181" }; //会话超时时间,单位毫秒 int sessionTimeOut = 10000; //锁节点根路径 string lockerPath = "/Locker"; for (var i = 0; i < 10; i++) { string client = "client" + i; //多线程模拟并发 new Thread(() => { using (ZookeeperLocker zookeeperLocker = new ZookeeperLocker(lockerPath, sessionTimeOut, address)) { string path = zookeeperLocker.CreateLock(); if (zookeeperLocker.Lock(path)) { //模拟处理过程 Console.WriteLine($"【{client}】获得锁:{DateTime.Now}"); Thread.Sleep(3000); Console.WriteLine($"【{client}】处理完成:{DateTime.Now}"); } else { Console.WriteLine($"【{client}】获得锁失败:{DateTime.Now}"); } } }).Start(); } Console.ReadKey(); } } }
运行结果如下:
可以发现,锁功能是实现了的
如果程序运行中打印日志:Client session timed out, have not heard from server in 8853ms for sessionid 0x1000000ec5500b2
或者直接抛出异常:org.apache.zookeeper.KeeperException.ConnectionLossException:“Exception_WasThrown”
只需要适当调整sessionTimeOut时间即可
以上就是C# 实现Zookeeper分布式锁的参考示例的详细内容,更多关于C# 实现Zookeeper分布式锁的资料请关注靠谱客其它相关文章!
最后
以上就是无辜柚子最近收集整理的关于C# 实现Zookeeper分布式锁的参考示例的全部内容,更多相关C#内容请搜索靠谱客的其他文章。
发表评论 取消回复