概述
Zookeeper机制详解
- 1. Zk的概念
- 2. zk的数据结构
- 3. Zk的简单使用(windows环境下)
- 4. zk的集群配置
- 5. zk怎么解决数据一致性
- 5.1 领导者选举机制
- 5.2 2pc机制
- 5.3 zk的ap和cp
- 6. zk部分源码查看
- 6.1 idea配置zookeeper集群
- 6.2 zk过半机制
- 7. zk集群启动、选举源码流程图
- 8. zk发布订阅机制
- 9. ZK的应用
- 9.1 分布式ID生成
1. Zk的概念
它是一个分布式框架,是Apache Hadoop的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务、集群管理、分布式应用配置项(dubbo)的管理等。
- zk是一个拥有文件特点的数据库
- zk是一个解决了数据一致性问题的分布式数据库
- zk是一个具有发布和订阅功能的分布式数据库(watch)
统一命名服务:在分布式架构中,很多系统需要去创建资源,资源的名字怎么保证不冲突,可以利用zk这个特点来解决。
2. zk的数据结构
名称是由斜杠(/)分隔的一系列路径元素。ZooKeeper命名空间中的每个节点都由路径进行唯一标识。
与标准文件系统不同,ZooKeeper命名空间中的每个节点都可以具有与其关联的数据以及子节点。每一个节点都可以存储数据,只是需要注意的是存储的容量是有限,一般不能超过 1MiB。
Znode的类型分为三类:
- 持久节点(persistent node):持久节点就是节点被创建后会一直存在服务器,直到删除操作主动清除,这种节点也是最常见的类型。
create <path> <data>
- 临时节点(ephemeral node):临时节点就是会被自动清理掉的节点,它的生命周期和客户端会话绑在一起,客户端会话结束,节点会被删除掉。与持久性节点不同的是,临时节点不能创建子节点。
create -e <path> <data>
- 持久顺序节点(sequential node):持久顺序节点就是有顺序的持久节点,节点特性和持久节点是一样的,只是额外特性表现在顺序上。顺序特性实质是在创建节点的时候,会在节点名后面加上一个数字后缀,来表示其顺序。
create -s <path> <data>
[zk: localhost:2181(CONNECTED) 48] create -s /luo 1
Created /luo0000000004
[zk: localhost:2181(CONNECTED) 49] create -s /luo 2
Created /luo0000000005
[zk: localhost:2181(CONNECTED) 50] ls /
[luo0000000005, zhihong, luo0000000004, zookeeper]
3. Zk的简单使用(windows环境下)
开启zk服务端和客户端
- ZK客户端:创建节点
create /zhihong 1
zhihong:是节点路径,1:是节点数据
- 执行
ls /
查看节点
- 执行
get /zhihong
获取节点数据
cZxid
:创建事务的idctime
:创建时间mZxid
:修改的事务id,每次修改操作(set)后都会更新mZxid和mtime。mtime
:修改时间pZxid
:子节点最后更新的事务id,子节点有变化(创建create、修改set、删除delete,rmr)时,都会更新pZxid。cversion
:子节点的版本号。当子节点有变化(创建create、修改set、删除delete,rmr)时,cversion 的值就会增加1。dataVersion
:节点数据的版本号,每次对节点的数据进行修改操作(set)后,dataVersion的值都会增加1(即使设置的是相同的数据)。aclVersion
:节点ACL的版本号,每次节点的ACL进行变化时,aclVersion 的值就会增加1。ephemeralOwner
:当前节点是临时节点(ephemeral node )时,这个ephemeralOwner的值是客户端持有的session id,如果是持久性节点那么值为 0。dataLength
:节点存储的数据长度,单位为 B (字节)。numChildren
:直接子节点的个数。
- 执行
set /zhihong 2
覆盖节点数据
[zk: localhost:2181(CONNECTED) 5] help
ZooKeeper -server host:port cmd args
stat path [watch]
set path data [version]
ls path [watch]
delquota [-n|-b] path
ls2 path [watch]
setAcl path acl
setquota -n|-b val path
history
redo cmdno
printwatches on|off
delete path [version]
sync path
listquota path
rmr path
get path [watch]
create [-s] [-e] path data acl
addauth scheme auth
quit
getAcl path
close
connect host:port
[zk: localhost:2181(CONNECTED) 6]
其实通过执行 help
就能查看基本的命令使用
4. zk的集群配置
首先复制三个文件
然后配置每个zookServer的zoo.cfg文件配置,以及在dataDir目录下新建myid文件,在文件中指定集群id号
分别执行三个zookServer里bin文件下的zkServer.cmd
,zk集群启动完成。
5. zk怎么解决数据一致性
5.1 领导者选举机制
触发机制:
-
集群启动
-
Leader挂
-
Follower 挂掉后 Leader 发现已经没有过半的 Follower 跟随自己了-不能对外提供服务
选举过程:
-
投票
- 先投给自己
- 沟通,pk,改选投票
-
选票
-
投票箱
-
过半机制
1.集群启动:依次启动zk1,zk2,zk3, 流程图如下:
PS:集群过程中,发现数据不一致则会进行回滚or同步。
5.2 2pc机制
注意:client 有整个集群 zookeeper 连接,它并不知道哪个 zk 节点是 lead节点,也无需知道(如果要实现代价太大了,从领域设计角度看,哪个是 lead,客户端是无需关心的),client 读写请求会负载均衡到每个 zk 节点,当写请求负载到 obser、follower节点时,它们会将请求转发到 leader 节点,写请求只能交与 leader 节点处理。
PS:写入数据依靠2pc保证数据一致性
5.3 zk的ap和cp
zk的ap和cp是从不同的角度分析的。
- ap:从一个读写请求分析,保证了可用性(不用阻塞等待全部follwer同步完成),保证不了数据的一致性,所以是 ap。
- cp:从zk架构分析,当leader节点因为网络故障与其他节点过半follower节点失去联系,无法完成数据同步。会触发 leader选举机制,不可用的follower节点将会抛弃,可用节点间重新选举出新 leader;在leader选举期间,会暂停对外提供服务(为啥会暂停,因为zk依赖leader来保证数据一致性),所以丢失了可用性,保证了一致性,即cp。
再细点话,这个c不是强一致性,而是最终一致性。即上面的写案例,数据最终会同步到一致,只是时间问题。
综上,zk广义上来说是cp,狭义上是ap。
6. zk部分源码查看
idea导入zookeeper源码
6.1 idea配置zookeeper集群
- conf目录下新增zoo.cfg、zoo1.cfg、zoo2.cfg
新增dataDir目录,并在目录下新增myid文件,指定zkServer的id
idea配置
其它2个配置分别指定不同的zoo1.cfg、zoo2.cfg即可。
6.2 zk过半机制
我们启动三个服务器,阻塞myid=1的服务10s后才发送ack,看client是否要在10s后能响应,并且10s才能获取数据。
在 SendAckRequestProcessor.processRquest(Request si) 的方法是Follower发送ack的代码,在代码中我们加入如图所示代码。如果myid=1,阻塞10s发送ack
分别启动三个配置:myid=1,myid=2,myid=3的服务。myid=2是leader。
打开cmd,执行zkCli.cmd -server localhost:2181连接myid=1的服务。在10s内我们依次执行如下命令。
create /zhihong 1
get /zhihong
可以看出,即便 myid=1 的 server 阻塞了发送 ack,myid=2 的leader也会发送提交命令,因为 自身+myid=3 的ack超过了一半。
现在我们阻塞 myid=1 and myid=3 的 server 发送ack,情况会是怎样的呢?修改代码如下:
分别启动三个配置:myid=1,myid=2,myid=3的服务,myid=2是leader,cmd连接myid=1的zkServer,cmd执行如下命令:
create /zkserver 1
发送请求后,阻塞了。
myid=1 and myid=3的 zkServer 发送 ack 阻塞了,导致客户端也阻塞。(过半机制)
过半机制:在QuorumMaj类如下方法实现,half是集群所有参与者数量。集群选取leader、ack回复都是采用这个过半机制。
问题:我们知道集群增加 follower节点,可以提高 get 查询的吞吐量。但是对于写请求,是由 leader 基于 2pc 分发给 follower 的,基于ack过半机制,随着 follower越多,需要回复 ack也就越多达到一半以上,才能执行异步提交,这样写请求性能降低了。
解决办法:obser节点。
- 4.异步提交:定位到LearnerHandler。查看lead.processAck 就是异步提交方法。
一直点进去看源码(—>commit(zxid)—>sendPacket(qp)—>f.queuePacket(qp)),我们发现提交请求数据包被添加到queuedPackets队列。而不是直接发送给follower,所以这个提交是异步的。那么从这里可以看出,zk虽然满足cp,但不是保证强一致性的
7. zk集群启动、选举源码流程图
8. zk发布订阅机制
Zookeeper 允许客户端向服务端的某个 Znode 注册一个 Watcher 监听,当服务端的一些指定事件触发了这个 Watcher,服务端会向指定客户端发送一个事件通知来实现分布式的通知功能,然后客户端根据 Watcher 通知状态和事件类型做出业务上的改变。
注册事件:通过这三个操作来绑定事件
- getData
- exists
- getChildren
凡是事务类型的操作,都会触发监听事件。(create/delete/setData)
事件类型
事件类型 | 发生原因 |
---|---|
None(-1) | 客户端连接状态发送变化的时候,会收到none事件 |
NodeCreated(1) | 创建节点的事件。比如create /demo data |
NodeDeleted(2) | 删除节点的事件。比如delete /demo |
NodeDataChanaged(3) | 节点数据发生变更 |
NodeChildrenChanged(4) | 子节点被创建、删除触发事件 |
事件类型对应事件机制
操作类型 | 监听事件 |
---|---|
create(/demo data) | NodeCreated(existis/getData) |
delete(/demo) | NodeDeleted(exists/getData) |
setData(/demo data) | NodeDataChanaged(exists/getData) |
create(/demo/children data) | NodeChildrenChanged(getChild) |
delete(/demo/children) | NodeChildrenChanged(getChild) |
setData(/demo/children data) |
watcher 机制,可以分为四个过程:
- 客户端注册 watcher
- 服务端处理 watcher
- 服务端触发 watcher 事件
- 客户端回调 watcher
watcher机制图解如下
Watcher 事件源码分析
以exists事件分析,zk服务端,客户端代码如下:
@Service
public class ZKClient implements InitializingBean {
private String address = "127.0.0.1:2181";
private ZooKeeper zooKeeper;
public void init() throws Exception {
zooKeeper = new ZooKeeper(address, 1500000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if(Event.KeeperState.SyncConnected==watchedEvent.getState()){
//如果是连接响应
System.out.println(111);
try {
region();
region1();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
}
@Override
public void afterPropertiesSet() throws Exception {
init();
}
public void region() throws Exception {
zooKeeper.exists("/aa", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event.getPath());
WatcherEvent wrapper = event.getWrapper();
System.out.println(222222);
}
});
}
public void region1() throws Exception {
zooKeeper.exists("/bb", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event.getPath());
WatcherEvent wrapper = event.getWrapper();
System.out.println(333333);
}
});
}
}
客户端注册 watcher(exists为例)
public Stat exists(final String path, Watcher watcher)
throws KeeperException, InterruptedException
{
final String clientPath = path;
// 校验节点路径
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
// WatchRegistration 只有watcher和path2个属性
wcb = new ExistsWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
// 设置请求头
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.exists);
// 设置请求体
ExistsRequest request = new ExistsRequest();
// 注册节点的路径
request.setPath(serverPath);
// 如果设置了watcher, 则设置true
request.setWatch(watcher != null);
// 设置响应体
SetDataResponse response = new SetDataResponse();
// 提交请求
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
if (r.getErr() == KeeperException.Code.NONODE.intValue()) {
return null;
}
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
return response.getStat().getCzxid() == -1 ? null : response.getStat();
}
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
// 封装成packet对象
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
synchronized (packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)
{
Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
synchronized (outgoingQueue) {
// 创建packet对象
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
// packet对象 加入到 outgoingQueue队列
outgoingQueue.add(packet);
}
}
// sendThread线程发送数据包 请求服务端
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
packet在outgoingQueue队列里,ClientCnxn的内部类SendThread的run方法把outgoingQueue给类ClientCnxnSocketNio继承ClientCnxnSocket,所以最终ClientCnxnSocketNio负责从outgoingQueue取出packet发送部分数据给服务端。
ClientCnxn的内部类SendThread的run方法的 doTransport()——>ClientCnxnSocketNio.doTranport()——>ClientCnxnSocketNio.doIO()
@Override
public void run() {
// 省略。。。
while (state.isAlive()) {
try {
// 省略。。。
// 客户端socket发送请求
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
} catch (Throwable e) {
// 省略。。。。
}
}
@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { // 客户端发送 注册监听节点信息
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
synchronized(outgoingQueue) {
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
enableWrite();
}
}
}
selected.clear();
}
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
if (sockKey.isReadable()) {
// 省略。。。
}
// socket写操作
if (sockKey.isWritable()) {
synchronized(outgoingQueue) {
// 从outgoingQueue中取出 packet对象
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
// 请求头、请求体序列化
p.createBB();
}
// 发送给服务端,只发送了节点路径、true等参数。
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount++;
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
// packet 添加到pendingQueue队列
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
// No more packets to send: turn off write interest flag.
// Will be turned on later by a later call to enableWrite(),
// from within ZooKeeperSaslClient (if client is configured
// to attempt SASL authentication), or in either doIO() or
// in doTransport() if not.
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
// On initial connection, write the complete connect request
// packet, but then disable further writes until after
// receiving a successful connection response. If the
// session is expired, then the server sends the expiration
// response and immediately closes its end of the socket. If
// the client is simultaneously writing on its end, then the
// TCP stack may choose to abort with RST, in which case the
// client would never receive the session expired event. See
// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
disableWrite();
} else {
// Just in case
enableWrite();
}
}
}
}
服务端处理 watcher
我们直接进入FinalRequestProcessor,找到processRequest方法
public void processRequest(Request request) {
// 省略。。。
case OpCode.exists: {
lastOp = "EXIS";
// TODO we need to figure out the security requirement for this!
ExistsRequest existsRequest = new ExistsRequest();
// 转化 ExistsRequest对象,属性有 path:/aa,watch:true
ByteBufferInputStream.byteBuffer2Record(request.request,
existsRequest);
String path = existsRequest.getPath();
if (path.indexOf('