通过上篇博文学习了zookeeper原生的api,发现比较繁琐:比如创建连接的时候,要写连接的监听,处理连接失败或过期需要重新连接;对节点的监听触发后需再次注册监听,否则监听只生效一次等问题的存在,期待有个可靠的客户端对原生api进行封装,简化代码量。
Curator是Netflix开源的一套zookeeper客户端框架,用它来操作zookeeper更加方便,按Curator官方所比喻的,guava to JAVA,curator to zookeeper,Curator采用了fluent风格的代码,非常简洁。
curator是使用最多的zookeeper客户端,已成为apache的一个项目。
1.pom.xml中引用jar
<dependencies>
<!-- 不需要,否则zookeeper的版本和下面curator所引用的zookeeper版本不一致,会导致运行出错
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.9</version>
</dependency> -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.11.1</version>
</dependency>
</dependencies>
使用curator的版本必须匹配服务器上安装zookeeper的版本,本人使用的是zookeeper的最新稳定版3.4.9(最新不稳定版是3.5.X),所以curator不能使用最新版本,否则创建节点时就会报org.apache.zookeeper.KeeperException$UnimplementedException: KeeperErrorCode = Unimplemented for /curator/test01 这错误,官方上有这段话
Versions
The are currently two released versions of Curator, 2.x.x and 3.x.x:
Curator 2.x.x - compatible with both ZooKeeper 3.4.x and ZooKeeper 3.5.x
Curator 3.x.x - compatible only with ZooKeeper 3.5.x and includes support for new features such as dynamic reconfiguration, etc.
所以curator使用2.x.x即可,就用2.x.x中最后一个版本2.11.1
2.连接
package com.fei.zk;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.retry.RetryUntilElapsed;
public class CuratorTest01 {
public static void main(String[] args) {
//重试连接策略
RetryPolicy retry = new ExponentialBackoffRetry(1000, 5);//重试5次,每次间隔时间指数增长(有具体增长公式)
RetryPolicy retry1 = new RetryNTimes(5, 5000);//重试5次,每次间隔5秒
RetryPolicy retry2 = new RetryUntilElapsed(60000 * 2, 5000);//重试2分钟,每次间隔5秒
//普通创建
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.0.219:2181")
//如果3秒连接不上客户端,就超时
.connectionTimeoutMs(3000)
//session的有效时间5秒
.sessionTimeoutMs(5000)
//设置重连接策略,重连接5次,每次失败后休息2秒
.retryPolicy(retry)
// 仍然可以继续设计其他属性
//....
.build();
client.start();
System.out.println(client.getState().name());
client.close();
}
}
如注释,创建客户端连接我们通常需要指定重试策略,curator提供了3种重试机制,分别如上;对于fluent风格,就是每个操作都返回了一个对象,我们可以一直通过[.方法名]的方式书写代码;client创建了之后,需要调用start方法才能真正去建立连接。会话超时时间是指当连接发生故障时,由于zk的心跳机制检测,服务端认为会话超时的时间,会清除session;
2.创建
原生api中创建节点时,如果父节点不存在,则无法创建子节点,所以需确保父节点的存在。curator提供了方法,如果父节点不存在,则自动进行创建.
/**
* 创建节点
* @param client
*/
public static void createNode(CuratorFramework client){
String path = "/curator/test01";
String value = "curator-test01";
try {
//同步方式创建
client.create()
.creatingParentsIfNeeded() //如果父节点不存在,则自动创建
.withMode(CreateMode.PERSISTENT) //节点模式,持久
.forPath(path, value.getBytes(defaultCharSet));
// client.create()
// .creatingParentsIfNeeded() //如果父节点不存在,则自动创建
// .withMode(CreateMode.EPHEMERAL) //节点模式,临时
// .inBackground()//后台方式,即异步方式创建
// .forPath(path, value.getBytes(defaultCharSet));
} catch (Exception e) {
e.printStackTrace();
}
}
服务器上通过zkCli.sh登录,查看
[zk: localhost:2181(CONNECTED) 6] get /curator
cZxid = 0x73
ctime = Mon Feb 27 15:48:26 CST 2017
mZxid = 0x73
mtime = Mon Feb 27 15:48:26 CST 2017
pZxid = 0x74
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
[zk: localhost:2181(CONNECTED) 7] get /curator/test01
curator-test01
cZxid = 0x74
ctime = Mon Feb 27 15:48:26 CST 2017
mZxid = 0x74
mtime = Mon Feb 27 15:48:26 CST 2017
pZxid = 0x74
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 14
numChildren = 0
[zk: localhost:2181(CONNECTED) 8]
看到父节点也有了,父节点值是空的。
3.修改
public static void update(CuratorFramework client){
try {
String path = "/curator/test01";
String value = "curator-test02";
//先获取节点状态信息
Stat stat = new Stat();
//获取节点值,并同时获取节点状态信息
byte[] data = client.getData().storingStatIn(stat).forPath(path);
//更新节点
client.setData()
.withVersion(stat.getVersion()) //版本校验,与当前版本不一致则更新失败,默认值-1无视版本信息进行更新
// .inBackground(paramBackgroundCallback) //异步修改数据,并进行回调通知
.forPath(path, value.getBytes(defaultCharSet));
data = client.getData().forPath(path);
System.out.println("修改后的值=" + new String(data,defaultCharSet));
} catch (Exception e) {
e.printStackTrace();
}
}
4.删除
public static void delete(CuratorFramework client){
try {
//删除已存在的节点
String path = "/curator/test01";
client.delete()
.guaranteed() //删除失败,则客户端持续删除,直到节点删除为止
.deletingChildrenIfNeeded() //删除相关子节点
.withVersion(-1) //无视版本,直接删除
.forPath(path);
//删除不存在的节点
String path2 = "/curator/test02";
client.delete()
// .guaranteed() //删除失败,则客户端持续删除,直到节点删除为止
.deletingChildrenIfNeeded() //删除相关子节点
.withVersion(-1) //无视版本,直接删除
.forPath(path2);
} catch (Exception e) {
e.printStackTrace();
}
}
测试发现,删除不存在的节点时,会抛出错误,所以删除时,最好先判断节点是否存在
5.检测节点是否存在
public static void checkExist(CuratorFramework client){
try {
String path = "/curator/test01";
Stat stat = client.checkExists().forPath(path);
if(stat == null){
System.out.println("节点" + path + "不存在");
}else{
System.out.println("节点" + path + "已存在");
}
} catch (Exception e) {
e.printStackTrace();
}
}
6.异步操作
上面一些例子也表明了可是有异步方式进行对节点的增删改查,并且进行回调通知.下面给个获取节点数据的异步回调例子。
public static void callBack(CuratorFramework client){
try {
String path = "/curator/test01";
byte[] data = null;
//同步获取
data = client.getData().forPath(path);//同步
System.out.println("同步方式获取节点数据,data=" + (data == null ? "null" : new String(data,defaultCharSet)));
//无回调的异步获取
data = client.getData().inBackground().forPath(path);//无回调的异步
//data可能是空,无法预测
System.out.println("无回调的异步方式获取节点数据,data=" + (data == null ? "null" : new String(data,defaultCharSet)));
//有回调通知的异步获取
//异步操作线程池,避免线程过多,给服务器造成影响。。。。使用线程池,要考虑线程池的关闭(这里省略)
ExecutorService es = Executors.newFixedThreadPool(5);
data = client.getData().inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
CuratorEventType c = curatorEvent.getType();//事件类型,可在CuratorEventType看到具体种类
int r = curatorEvent.getResultCode();//0,执行成功,其它,执行失败
Object o = curatorEvent.getContext();//事件上下文,一般是由调用方法传入,供回调函数使用的参数
String p = curatorEvent.getPath();//节点路径
List<String> li = curatorEvent.getChildren();//子节点列表
byte[] _data = curatorEvent.getData();//节点数据
System.out.println("回调通知,data=" + (_data == null ? "null" : new String(_data,defaultCharSet)));
}
},es).forPath(path);//有回调的异步
System.out.println("有回调的异步方式获取节点数据,data=" + (data == null ? "null" : new String(data,defaultCharSet)));
} catch (Exception e) {
e.printStackTrace();
}
}
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
STARTED
同步方式获取节点数据,data=curator-test01
无回调的异步方式获取节点数据,data=null
有回调的异步方式获取节点数据,data=null
回调通知,data=curator-test01
看日志可发现,同步方式,会让主线程阻塞等待服务器返回数据;异步方式主线程不等待服务器返回数据,就直接往下执行了。
7.节点监控
在上篇博文学习时,发现原生api,对节点的监控是一次性的,当触发后,就不再起作用了。curator可以一直生效。
/**
* 节点监听需要用repices包中的NodeCache来完成
* 子节点的监听需要用PathChildrenCache来完成
* @param client
*/
public static void nodeMonitor(CuratorFramework client){
try {
//节点监听需要用repices包中的NodeCache来完成
final String path = "/curator/test01";
final NodeCache cache = new NodeCache(client,path);
cache.start();
cache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
byte[] ret = cache.getCurrentData().getData();
System.out.println("当前节点" + path +"=:"+ new String(ret));
}
});
// cache.close();
//在父节点进行监听
final String pPath = "/curator";
PathChildrenCache pcCache = new PathChildrenCache(client,pPath,true);
pcCache.start();
pcCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent)
throws Exception {
switch (pathChildrenCacheEvent.getType()){//子节点的事件类型
//通过pathChildrenCacheEvent,可以获取到节点相关的数据
case CHILD_ADDED:
System.out.println("增加节点" + pathChildrenCacheEvent.getData().getPath()
+ "=" +new String(pathChildrenCacheEvent.getData().getData(),defaultCharSet) );
break;
case CHILD_REMOVED:
System.out.println("删除节点"+pathChildrenCacheEvent.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("更新节点" + pathChildrenCacheEvent.getData().getPath()
+ "=" +new String(pathChildrenCacheEvent.getData().getData(),defaultCharSet) );
break;
default:
break;
}
}
});
// pcCache.close();
} catch (Exception e) {
e.printStackTrace();
}
}
在服务器zkCli.sh连接服务器,进行修改、新增操作。代码日志
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
STARTED
增加节点/curator/test01=curator-test01
当前节点/curator/test01=:curator-test01
更新节点/curator/test01=test0002
当前节点/curator/test01=:test0002
更新节点/curator/test01=test003
当前节点/curator/test01=:test003
增加节点/curator/test02=test02
注意:如果cache.close(),则不会再监听了,同时第一次启动的时候,如果节点存在或子节点存在,也会立即触发一次监听。服务器kill杀掉zookeeper进程,然后./zkServer.sh start 启动,代码客户端会自动重新连接(连接策略生效范围内)。./zkCli.sh 连接zk,对/curator增加子节点或修改子节点的值,发现代码客户端还能触发监控.
8事务
对多个节点进行操作,要么都成功,要么都失败。
public static void tansation(CuratorFramework client){
try {
client.inTransaction()
.create().withMode(CreateMode.EPHEMERAL).forPath("/t01","ttt".getBytes(defaultCharSet))
.and()
.setData().withVersion(-1).forPath("/conf/curator/test01", "fei002".getBytes(defaultCharSet))
.and()
.delete().withVersion(-1).forPath("/curator/test01")
.and()
.commit();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
最后
以上就是甜美香菇最近收集整理的关于zookeeper学习4之curator开源的zookeeper客户端的全部内容,更多相关zookeeper学习4之curator开源内容请搜索靠谱客的其他文章。
发表评论 取消回复