概述
文章目录
- Zookeeper
- 什么是Zookeeper
- Zookeeper的特点
- Zookeeper的数据结构
- 配置参数解读
- 选举机制(!!!重点 !!!)
- 第一次启动
- 非第一次启动
- 节点类型
- Stat结构体
- 监听器的原理(!!!重点!!!)
- 监听器
- 节点删除
- API的操作
- 监听
- 判断节点是否存在
- 写流程之写入请求发送给follower
- 写流程之写入请求发送给Follower
- 服务器动态上下线
- Zookeeper分布式锁案例
- 面试重点
Zookeeper
什么是Zookeeper
zookeeper = 文件系统 + 通知机制
从设计模式角度理解,是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper就将负责通知在Zookeeper上注册的观察者作出相关的反应。
Zookeeper的特点
- Zookeeper有一个领导者(Leader),多个跟随者(Follower)
- Zookeeper上的节点,只有超过半数以上存活才会提供服务
- 全局数据一致,每个Server保存一份相同的数据,Client无论连接到哪一个Server上数据都是一致的
- 更新请求顺序进行,来自同一个Client的更新请求按其发送顺序依次执行
- 数据更新原子性,一次数据更新要么成功,要么都不成功
- 实时性,在一定时间内,Client能读到最新数据
Zookeeper的数据结构
类似于Linux的数据结构,由根节点出发,到达多个子节点,每一个节点叫做ZNode。
默认:每个ZNode能够存储 1M 的数据
配置参数解读
- tickTime=2000:通信心跳数,Zookeeper和客户端或者服务器之间,心跳时间,单位毫秒(2秒)。也就是每隔两秒一次心跳
- initLimit=10:初始通信时间限制,单位是多少次心跳(10次心跳),2秒心跳一次,也就是20秒,初始启动服务的时候,20秒内需要有应答
- syncLimit=5:同步通信时间限制,Leader和Follower之间的通信响应时长。如果超过syncLimit * tickTime,Leader就会认为Follower死掉了,会从服务器列表中将改Follower删除
- dataDir:Zookeeper保存数据的地方,默认是保存在tmp(因为tmp容易被删除,所以一般不用tmp)
- clientPort=2181:客户端的连接端口
选举机制(!!!重点 !!!)
第一次启动
说明:有5台服务器
- ①号服务器启动,发起一次选举,先给自己投一票
查看票数:①号:1票
票数没有过半,①号保持为Looking - ②号服务器启动,发起一次选举,又先给自己投一票,和①号服务器交换投票信息,①号发现②号的myid比它大,①号更改自己的选票,改投给了②号
查看票数:①号:0票 | ②号:2票
票数没有过半,①号②号保持Looking - ③号服务器启动,发起一次选举,先给自己投一票,此时①号和②号都发现③号的myid都比他们大,所以他们的票都改投为③号
查看票数:①号:0票 | ②号:0票 | ③号:3票
票数已经过半了,③号当选为Leader,①号②号为Follower - ④号服务器启动:发起一次选举,先给自己投一票,因为①号②号是Follower,3号是Leader,他们都不是Looking的状态了,所以他们不会更改选票了
查看票数:①号:0票 | ②号:0票 | ③号:3票 | ④号:1票
所以④号的状态为Follower - ⑤号服务器启动:和④号一样,为Follower
非第一次启动
说明:还是上面5台机器,③号为Leader,现在③号和⑤号宕机了
一.、有两种情况会进入Leader选举
1.服务器初始化启动(上面的情况)
2.服务器运行期间无法和Leader保持连接(这种情况)
二、机器进入Leader选举的时候,二回有两种状态
1.集群本来就有Leader
机器去尝试选举Leader时候,会被告知当前Leader服务器的信息,所以改机器只需要和Leader建立连接就好
2.集群不存在Leader
SID:服务器ID,和myid一致
ZXID:事物ID,用来表示一次服务器状态的变更。
Epoch:每个Leader任期的代号
现在③号和⑤号宕机了
①②④号进行投票:
选举规则:Epoch => ZXID => SID(先比Epoch,再比ZXID,再比SID)
(Epoch,ZXID,SID)
假设 ①:(1,8,1) | ②:(1,8,2) | ④:(1,7,4)
所以②号为Leader,①④为Follower
节点类型
- 持久化目录节点(Persistent):客户端和Zookeeper断开后,依旧存在
create /sanguo "diaochan"
- 持久化顺序编号节点(Persistent_sequence):客户端和Zookeeper断开后,依旧存在,Zookeeper对该节点名称进行排序
create -s /sanguo "diaochan"
- 临时目录节点(Ephemeral):客户端和Zookeeper断开后,不存在
create -e /sanguo/wuguo
- 临时顺序编号节点(Ephemeral_sequence):客户端和Zookeeper断开后,不存在,Zookeeper对该节点名称进行排序
create -e -s /sanguo/wuguo
Stat结构体
1)czxid-创建节点的事务zxid
每次修改ZooKeeper状态都会收到一个zxid形式的时间戳,也就是ZooKeeper事务ID。
事务ID是ZooKeeper中所有修改总的次序。每个修改都有唯一的zxid,如果zxid1小于zxid2,那么zxid1在zxid2之前发生。
2)ctime - znode被创建的毫秒数(从1970年开始)
3)mzxid - znode最后更新的事务zxid
4)mtime - znode最后修改的毫秒数(从1970年开始)
5)pZxid-znode最后更新的子节点zxid
6)cversion - znode子节点变化号,znode子节点修改次数
7)dataversion - znode数据变化号
8)aclVersion - znode访问控制列表的变化号
9)ephemeralOwner- 如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0。
10)dataLength- znode的数据长度
11)numChildren - znode子节点数量
监听器的原理(!!!重点!!!)
- 有一个Main线程
- 在main线程中创建zkClient(zookeeper客户端),这个时候会创建两个线程,一个负责网络连接通信(connect),一个负责监听(Listener)
- 通过connect线程将注册的监听事件发送给Zookeeper
- 在zookeeper中注册的监听器列表中添加注册监听事件
- zookeeper监听到有数据或者路径的变化,就会将这个消息发送给Listener
- Listener线程内部调用了process方法
监听器
注册一次,只能监听一次
ls -w /sanguo
监听/sanguo
改变三国的值
发生变化
我再修改一次
状态还是一样
所以就是 注册一次,只能监听一次
节点删除
delete /path
查看/sanguo,删除/sanguo/jin,查看,删除成功
deleteall /path
查看/sanguo,删除/sangu,发现它说节点不为空,不给删除,那我们就deleteall,再次查看,节点已经不存在,那就是已经被删除了
API的操作
节点创建成功
监听
运行,然后在shell窗口,创建一个节点
发现控制台没有反应,因为我们值注册了一次,所以只能监听一次。
我们需要把创建监听器的方法放到,process方法里就可以一直监听了
在shell里创建一个节点
控制台输出打印
删除
可以看到,这里的是持续的监听,多次注册
判断节点是否存在
控制台输出
写流程之写入请求发送给follower
- 客户端向Leader发送写入数据,Leader先自己写一份
- 传给Follower,Follower再写一份
- 给Leader应答,已经写好了
- 因为写入数量已经超过半数,所以 Leader给客户端应答
- Leader继续给下一个Follower传数据
- Follower写入完毕,给Leader应答
写流程之写入请求发送给Follower
- 客户端向Follower发送写入数据
- Follower把请求转发给Leader
- Leader先把数据自己写一份,然后给Follower应答写入
- Follower写入后, 给Leader应答
- Leader看到写入数量过半然后就给Follower应答,可以给客户端应答了
- Follower才向客户端应答
- Leader向另一个Follower发送写入数据
- Follower写入成功,告诉Leader写入成功了
服务器动态上下线
- 服务器的上线,服务端启动时,创建的是临时有序的节点(因为服务下线后,zookeeper不会保留已经下线的服务器,并且服务器有多个,所以是有序的)
- 客户端进行监听,getChildren获取当前的在线服务器列表,并且注册监听
- 服务器下线
- 服务器下线通知
先创建一个/servers节点
服务器注册:
package case1;
import org.apache.zookeeper.*;
import java.io.IOException;
public class DistributeServer {
private String connectString = "hadoop01:2181,hadoop02:2181,hadoop03:2181";
private int sessionTimeout = 2000;
private ZooKeeper zk;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
// 获取zk连接
DistributeServer server = new DistributeServer();
server.getConnect();
// 注册服务器到zk集群
server.regist(args[0]);
// 启动业务逻辑(睡觉)
server.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void regist(String hostname) throws KeeperException, InterruptedException {
// /servers创建的路径,值为主机名(服务器名),监听权限为开放,节点类型是临时有序的
String create = zk.create("/servers", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname + "is online");
}
private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
}
客户端监听:
package case1;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class DistributeClient {
private String connectString = "hadoop01:2181,hadoop02:2181,hadoop03:2181";
private int sessionTimeout = 2000;
private ZooKeeper zk;
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
// 获取zk连接
DistributeClient client = new DistributeClient();
client.getConnect();
// 监听/servers 下面子节点的增加和删除
client.getServerList();
// 业务逻辑(睡觉)
client.business();
}
private void business() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void getServerList() throws KeeperException, InterruptedException {
List<String> children = zk.getChildren("/servers", true);
ArrayList<String> servers = new ArrayList<>();
for (String child : children) {
byte[] data = zk.getData("/servers/" + child, false, null);
servers.add(new String(data));
}
// 打印
System.out.println(servers);
}
private void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
try {
getServerList();
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
先把客户端监听启动
在shell里创建一个hadoop01
查看控制台,打印信息
尝试下线操作
这里运行服务端上线
这里监控到了101已上线
改变一下
Zookeeper分布式锁案例
package case2;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class DistributedLock {
private String connectString = "hadoop01:2181,hadoop02:2181,hadoop03:2181";
private int sessionTimeout = 2000;
private final ZooKeeper zk;
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
private String waitPath;
private String currentMode;
public DistributedLock() throws IOException, InterruptedException, KeeperException {
// 获取连接
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// connectLath 如果连接上zk 可以释放
if (watchedEvent.getState() == Event.KeeperState.SyncConnected){
connectLatch.countDown();
}
// waitLatch 需要释放
if(watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){
waitLatch.countDown();
}
}
});
// 等待zk正常连接后,往下走程序
connectLatch.await();
// 判断根节点 /locks 是否存在
Stat stat = zk.exists("/locks", false);
// 如果/locks为空,不存在,我们就创建根节点
if (stat == null) {
// 创建根节点
zk.create("/locks", "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
// 对zk加锁
public void zkLock() {
// 创建对应的临时带序号节点
try {
currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> children = zk.getChildren("/locks", false);
//如果children只有一个值,那就直接获取锁,如果有多个节点,需要判断谁最小
if (children.size() == 1) {
return;
} else {
// 对children里面的值进行排序,排序以后里面的值就是有序递增的
Collections.sort(children);
// 获取节点名称 seq-0000000
String thisNode = currentMode.substring("/locks/".length());
// 通过seq-00000000获取该节点在children集合的位置
int index = children.indexOf(thisNode);
// 判断
if (index == -1) {
System.out.println("数据异常");
} else if (index == 0) {
// 就一个节点可以获取锁
return;
} else {
// 监听前一个节点
waitPath = "/locks/" + children.get(index - 1);
zk.getData(waitPath, true, null);
// 等待监听
waitLatch.await();
return;
}
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
// 判断创建的节点是否是最小的需要及诶单,如果是获取到锁,如果不是,监听前一个节点
}
// 解锁
public void unZkLock() throws KeeperException, InterruptedException {
// 删除节点
zk.delete(currentMode,-1);
}
}
面试重点
5.1 请简述ZooKeeper的选举机制
分两种情况,第一种就是初始化服务器(第一次启动),每个人先给自己投一票,然后对比myid,看谁的大,投票就改投为大的,当选票超过半数,Leader产生。
第二种,分两种,一种是Leader挂了,他们会先对比Epoch,看谁以前当过Leader,再看ZXID,事务的ID,看谁做的多,最后对比myid。
另一种就是Follower挂了,再尝试选举的时候,它会得知集群中Leader的信息,所以它只要和Zookeeper连接上就可以了。
5.2 ZooKeeper的监听原理是什么?
首先得有一个main()线程,创建zookeeper的连接,这个时候会创建两个线程,一个是Listener和connect,connect负责连接服务器,将注册的监听事件发送给zookeeper,zookeeper将发过来的注册监听事件存入到注册监听列表里,如果监听到数据或者路径有变化,zookeeper将消息发送给Listener,Listener创建一个process进程
5.3 ZooKeeper的部署方式有哪几种?集群中的角色有哪些?集群最少需要几台机器?
(1)部署方式单机模式、集群模式
(2)角色:Leader和Follower
(3)集群最少需要机器数:3(奇数)
5.4 ZooKeeper的常用命令
ls create get delete set…
最后
以上就是爱笑钢笔为你收集整理的Zookeeper知识点Zookeeper的全部内容,希望文章能够帮你解决Zookeeper知识点Zookeeper所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复