我是靠谱客的博主 专注香氛,最近开发中收集的这篇文章主要介绍【互联网代理方案】——Zookeeper,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

Zookeeper——数据结构

嵌套表示:模仿文件系统的父子关系,即利用路径/决定父子关系的方式来实现节点的树形数据结构;
在这里插入图片描述
节点类型
(1)永久节点----一般是某个操作的根节点一定是永久节点,用于集成管理所有临时节点;
(2)临时节点----客户端和Zookeeper断开连接后,该节点被删除;统计在线机器,以及实现分布式锁,因为临时节点会话结束另一个就可以获取锁了;
(3)序号节点----会被标记序号存入Zookeeper,比如抢购时候的,先到先得到;
本质概述
zookeeper中就是用路径作为key去找到node对应的数据;所有node的父子关系又有路径嵌套包含去表示;
事件类型(znode节点相关)
①EventType.NodeCreated:节点创建
②EventType.NodeDataChanged:节点数据变更
③EventType.NodeChildrenChanged:子节点变更
④EventType.NodeDeleted:节点删除
状态类型(客户端实例相关):
①KeeperState.Disconnected:未连接
②KeeperState.SyncConnected:已连接
③KeeperState.AuthFailed:认证失败
④KeeperState.Expired:会话失效
Znode类型
①PERSISTENT:持久化目录节点; 客户端与zookeeper断开连接后,该节点依旧存在
②PERSISTENT_SEQUENTIAL:持久化顺序目录编号节点;客户端与zookeeper断开连接后,该节点依旧存在,zookeeper给该节点名称进行顺序编号
③EPHEMERAL-:临时目录节点; 客户端与zookeeper断开连接后,删除该节点
④EPHEMERAL_SEQUENTIAL:临时顺序编号目录节点;zookeeper给该节点名称进行顺序编号,客户端与zookeeper断开连接后,删除该节点

Zookeeper——应用场景(X)

命名服务

直接利用zk,key与value的对应关系,将服务信息注册Zookeeper,命名对应服务器ip地址和其他元数据完成这个功能;

配置管理

多个应用对于zookeeper上的节点进行监听,然后拉取这个数据实现动态刷新应用内的bean

分布式锁

不断的针对在一个key创建节点,不能重复创建(只能有一人获得),且临时节点连接断了就会消失,手动实现,来实现分布式锁

读写锁

1、读写约定法:value一个是write,一个read
2、尾部判断法: 所有读锁下面可以增加节点,所有写锁下面不能增加节点,因此只需要判断尾部是不是写锁即可;
3、监听上一个节点代码:写锁或者读锁监测到最后一个读的时候就添加监听,从效率来讲,如果一个写锁解放后同时通知一千个节点,那么就会造成性能损耗判断太多;因此等待队列中利用链表监听更实际,只监听上一个节点是否成功写入;

Zookeeper——工作原理

集群强一致性原理

Zookeeper内部利用全局事务机制,决定了一个创建节点的请求,要么全部成功,要么全部失败,最后返回给客户端创建节点失败最终强一致性的表现性;
消息广播模式
(1.1)事务咨询:leader所有参与者是否可以参与事务,所有follower通ack回应
(1.2)分发数据:然后生成事务id此时再将同步块数据分发给所有follower,完成同步后再回应给leader
(1.3)决议事务:leader根据所有follower反馈的结果决定此次事务是否成功,不成功则利用id完成回滚或作废,成功则返回创建节点成功
崩溃恢复模式
(2.1)leader选举,leader挂掉后所有首先follower进入looking状态,然后选举新的leader;
(2.2)重启事务,新上任的leader根据事务id先回滚所有follower,然后再重新开始事务流程,完成数据同步并反馈给客户端

集群高可用性原理

Zookeeper各个服务节点在上线时,会利用自身的通讯机制选举出三种角色的节点来保证高可用;
Leader领导者
1、存储所有节点的数据副本,并帮助新加入或重启的子节点恢复数据
2、处理不同类型请求
(1)PING请求是指Learner的心跳信息;
(2)REQUEST请求,是Follower发送的提议请求,包括写请求及同步请求;
(3)ACK请求,是 Follower的对提议的回复,超过半数的Follower通过,则commit该提议;REVALIDATE消息是用来延长SESSION有效时间
Follower跟随者
(1)接收Client的请求,如果为写请求,发送给Leader进行投票;如果为读请求,则进行负载均衡;
(2)接收Leader消息并进行处理,消息循环处理机制;
————(2.1)PROPOSAL消息:Leader发起的提案,要求Follower投票
————(2.2)SYNC消息:返回SYNC结果到客户端,这个消息最初由客户端发起,用来强制得到最新的更新
————(2.3)REVALIDATE消息:根据Leader的REVALIDATE结果,关闭待revalidate的session还是允许其接受消息;
(3)参与新的leader的选举
Observer观察者
Follower太多的时候会导致投票选举影响性能,所以声明部分子节点是观察者则不会导致这个问题;

集群选举原理

zookeeper默认的算法是FastLeaderElection 采用投票数大于半数则胜出;
以下是集群选举的三个因素;
集群选举因素——服务器id:.比如有三台服务器,编号分别为1,2,3;编号越大在选择算法中的权重越大
集群选举因素——数据ID:服务器中存放的最新数据version;值越大说明数据越新,在选举算法中数据越大权重越大;
集群选举因素——逻辑时钟:1.逻辑时钟也叫投票的次数,同一轮投票过程中的逻辑时钟值是相同的;2.每投完一次票这个数据就会增加,然后与接受到的其他服务器返回的投票信息中的数值相比;根据不同的值做出不同的判断;
全新集群选举:一个zookeeper集群刚刚搭建起来,没有任何数据,他的选举,就叫全新集群选举;集群过半时,选择3号为leader;
非全新集群选举:中途有机器down掉,需要重新选举时,选举过程中就需要加入数据id;
(1)逻辑时钟小的选举结果被忽略,重新投票
(2)同一逻辑时钟后,数据id大的胜出
(3)数据id相同的情况下,服务器id大的胜出

Zookeeper——节点管理(Linux)

【基本操作】
进入客户端:bin/zkCli.sh
查看子节点 :ls / ls /crm
历史记录:history
redo 10 执行history中某个的命令、
【增加/删除/更改/监听】
创建节点:
created /zkPro myData(默认是永久节点)
created /zkPro/username myData
created /zkPro/password myData
created -e /zkPro/password (顺序节点)
created -s /zkPro/password (临时节点)
获取节点:get /zkPro
更新节点:set /zkPro zhuge
删除节点:delete /zkPro
监听节点:
get /crm/fuck watach
【限制操作】
限制节点:setquota -n 3 /crm设置子节点最大路径,超过只会警告
查看限制:listquota /crm
删除限制:delquota -n /crm
【节点状态】
通过,get /mynode1 拿到
dataVersion:数据版本号
eversion:子节点版本号,子节点变化时会+1
aclVersion:ACL版本号
cZxid:Znode创建的时候,生成的事务id
mZxid:Znode更新的时候,生成的事务Id
ctime:节点更新时的时间戳
ephemeralOwner:为0则不是临时节点,绑定了客户端的sessionID
dataLength = 0 #节点数据的字节数
numChildren = 1 #子节点的个数

Zookeeper——节点管理(Java)

maven

<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.4.6</version>
</dependency>

连接与监听回调

   /**
    * @连接与监听回调
    * 可以根据监听类型、状态、路径来执行回调函数
    */
ZooKeeper zk = new ZooKeeper("192.168.25.8:2181", 3000, new Watcher() {
       @Override
       public void process(WatchedEvent event) {
           String path = event.getPath();                //负责监听的路径
           Event.KeeperState state = event.getState();//负责监听的状态
           /**
            *   None(-1)      成功连接回调
            *   NodeCreated(1)成功创建回调
            *   NodeDeleted(2)成功删除糊掉
            *   NodeDataChanged(3)成功修改回调
            *   NodeChildrenChanged(4)子节点发生变化回调
            */
           Event.EventType type = event.getType();       //负责监听类型
       }
   });

创建节点

 /**
    * @创建节点
    * key
    * 数据
    * 权限
    * 节点类型
    */
   zk.create("/a1", "yangxinlei".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//持久、
   zk.create("/c1", "yangxinlei123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);//持久序列
   zk.create("/b1", "yangxinlei12".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
   zk.create("/b1", "yangxinlei12".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

删除节点

  /**
    * @删除节点
    * version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查
    */
   zk.delete("/a1",-1);

修改节点

 /**
    * @修改节点
    * version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查
    * 更新利用版本号实现安全
    */
   zk.setData("/b1","testUpdate".getBytes(),-1);

获取数据

  /**
    * @获取数据,可以注册监听器,,stat可以针对性拿到状态数据
    */
   byte[] data = zk.getData("/c1", true, null);

获取子节点

 /**
    * @获取子节点(同步)
    * fasle标识不需要监听,否则需要放回调函数接受
    */
   List<String> childrenList = zk.getChildren("/test", false);
   /**
    * @获取子节点(异步)
    */
   zk.getChildren("/test", true, new AsyncCallback.Children2Callback() {
       public void processResult(int i, String s, Object o, List<String> list, Stat stat) {

       }
   }, null);

Zookeeper——节点管理(工具类封装)

@Configuration
public class ZookeeperConfig {
    private static final Logger logger = LoggerFactory.getLogger(ZookeeperConfig.class);

    @Value("${zookeeper.address}")
    private    String connectString;

    @Value("${zookeeper.timeout}")
    private  int timeout;


    @Bean(name = "zkClient")
    public ZooKeeper zkClient(){
        ZooKeeper zooKeeper=null;
        try {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            //连接成功后,会回调watcher监听,此连接操作是异步的,执行完new语句后,直接调用后续代码
            //  可指定多台服务地址 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
             zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if(Event.KeeperState.SyncConnected==event.getState()){
                        //如果收到了服务端的响应事件,连接成功
                        countDownLatch.countDown();
                    }
                }
            });
            countDownLatch.await();
            logger.info("【初始化ZooKeeper连接状态....】={}",zooKeeper.getState());

        }catch (Exception e){
            logger.error("初始化ZooKeeper连接异常....】={}",e);
        }
         return  zooKeeper;
    }


}
————————————————
版权声明:本文为CSDN博主「codeing_doc」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/u010391342/article/details/100404588
@Component
public class ZkApi {

    private static final Logger logger = LoggerFactory.getLogger(ZkApi.class);

    @Autowired
    private ZooKeeper zkClient;


    /**
     * 判断指定节点是否存在
     * @param path
     * @param needWatch  指定是否复用zookeeper中默认的Watcher
     * @return
     */
    public Stat exists(String path, boolean needWatch){
        try {
            return zkClient.exists(path,needWatch);
        } catch (Exception e) {
            logger.error("【断指定节点是否存在异常】{},{}",path,e);
            return null;
        }
    }

    /**
     *  检测结点是否存在 并设置监听事件
     *      三种监听类型: 创建,删除,更新
     *
     * @param path
     * @param watcher  传入指定的监听类
     * @return
     */
    public Stat exists(String path,Watcher watcher ){
        try {
            return zkClient.exists(path,watcher);
        } catch (Exception e) {
            logger.error("【断指定节点是否存在异常】{},{}",path,e);
            return null;
        }
    }

    /**
     * 创建持久化节点
     * @param path
     * @param data
     */
    public boolean createNode(String path, String data){
        try {
            zkClient.create(path,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
            return true;
        } catch (Exception e) {
            logger.error("【创建持久化节点异常】{},{},{}",path,data,e);
            return false;
        }
    }


    /**
     * 修改持久化节点
     * @param path
     * @param data
     */
    public boolean updateNode(String path, String data){
        try {
            //zk的数据版本是从0开始计数的。如果客户端传入的是-1,则表示zk服务器需要基于最新的数据进行更新。如果对zk的数据节点的更新操作没有原子性要求则可以使用-1.
            //version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查
            zkClient.setData(path,data.getBytes(),-1);
            return true;
        } catch (Exception e) {
            logger.error("【修改持久化节点异常】{},{},{}",path,data,e);
            return false;
        }
    }

    /**
     * 删除持久化节点
     * @param path
     */
    public boolean deleteNode(String path){
        try {
            //version参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version为-1则忽略版本检查
            zkClient.delete(path,-1);
            return true;
        } catch (Exception e) {
            logger.error("【删除持久化节点异常】{},{}",path,e);
            return false;
        }
    }

    /**
      * 获取当前节点的子节点(不包含孙子节点)
      * @param path 父节点path
      */
    public List<String> getChildren(String path) throws KeeperException, InterruptedException{
        List<String> list = zkClient.getChildren(path, false);
        return list;
    }

    /**
     * 获取指定节点的值
     * @param path
     * @return
     */
    public  String getData(String path,Watcher watcher){
        try {
            Stat stat=new Stat();
            byte[] bytes=zkClient.getData(path,watcher,stat);
            return  new String(bytes);
        }catch (Exception e){
            e.printStackTrace();
            return  null;
        }
    }


    /**
     * 测试方法  初始化
     */
    @PostConstruct
    public  void init(){
        String path="/zk-watcher-2";
        logger.info("【执行初始化测试方法。。。。。。。。。。。。】");
        createNode(path,"测试");
        String value=getData(path,new WatcherApi());
        logger.info("【执行初始化测试方法getData返回值。。。。。。。。。。。。】={}",value);

        // 删除节点出发 监听事件
        deleteNode(path);

    }

}

————————————————
版权声明:本文为CSDN博主「codeing_doc」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/u010391342/article/details/100404588

当然了Zookeeper将数据发往java英语的时候,我们也不知道数据时效性,因此zookeeper对外提供version让我们自己利用乐观锁实现这个并发原子性问题

最后

以上就是专注香氛为你收集整理的【互联网代理方案】——Zookeeper的全部内容,希望文章能够帮你解决【互联网代理方案】——Zookeeper所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(58)

评论列表共有 0 条评论

立即
投稿
返回
顶部