ZooKeeper常用客户端有三种:原生客户端、zkClient、curator
项目中使用前,需要导入相关依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
public class TestCreateSession {
/*服务地址*/
private static final String ZK_SERVER = "127.0.0.1:2181";
@Test
public void createSession2() throws IOException {
ZooKeeper zk = new ZooKeeper(ZK_SERVER, 50000, null);
System.out.println("zk.getState() = " + zk.getState());
}
}
zk.getState() = CONNECTING
通过之前的学习可以知道,CONNECTING标志客户端正在连接,并不能确保已经连接上zk服务。可能发生还没有连接到zk服务就进行对zk访问的情况
public class TestCreateSession {
/*服务地址*/
private static final String ZK_SERVER = "127.0.0.1:2181";
/*倒计时器*/
private CountDownLatch latch = new CountDownLatch(1);
@Test
public void createSession() throws IOException, InterruptedException {
ZooKeeper zk = new ZooKeeper(ZK_SERVER, 50000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected){/*确保zk已连接*/
latch.countDown();
}
}
});
latch.await();
System.out.println("zk.getState() = " + zk.getState());
}
}
zk.getState() = CONNECTED
使用监听机制可以确保在ZooKeeper初始化完成前进行等待,初始化完成再进行后续操作
public class TestJavaApi implements Watcher {
/*zk服务地址*/
private static final String ZK_SERVER = "127.0.0.1:2181";
/*会话连接超时时间*/
private static final int SESSION_TIMEOUT = 50000;
/*指定目录【节点】*/
private static final String ZK_PATH = "/zkDir";
/*客户端连接会话*/
private ZooKeeper zk = null;
/\*倒计时器\*/
private CountDownLatch latch = new CountDownLatch(1);
/\*\*
\* 事件被触发时的动作
\* @param event 事件
\*/
@Override
public void process(WatchedEvent event) {
System.out.println("收到事件通知:" + zk.getState() +"\\n");
if (event.getState() == Event.KeeperState.SyncConnected){
latch.countDown();
}
}
/\*\*
\* 创建zk会话连接
\* @param connectString zk服务器地址列表,可以是"地址1,地址2,...."
\* @param sessionTimeout Session超时时间
\*/
public void createZkSession(String connectString, int sessionTimeout){
try {
zk = new ZooKeeper(connectString,sessionTimeout,this);
latch.await();
System.out.println("zk.getState() = " + zk.getState());
} catch (IOException|InterruptedException e) {
System.out.println("连接创建失败");
e.printStackTrace();
}
}
/\*\*
\* 关闭zk会话
\*/
public void releaseSession(){
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/\*\*
\* 创建节点【目录、文件】
\* @param path 节点
\* @param data 节点数据
\* @return
\*/
public boolean createNode(String path,String data){
try {
String node = zk.create(path/\*节点path\*/,
data.getBytes()/\*节点数据\*/,
ZooDefs.Ids.OPEN\_ACL\_UNSAFE/\*权限控制 OPEN\_ACL\_UNSAFE相当于world:anyone\*/,
CreateMode.EPHEMERAL)/\*临时节点\*/;
System.out.println("节点创建成功,node = " + node);
return true;
} catch (KeeperException|InterruptedException e) {
System.out.println("节点创建失败");
e.printStackTrace();
}
return false;
}
/\*\*
\* 获取节点数据
\* @param path 节点路径
\* @return
\*/
public String readNode(String path){
try {
byte\[\] data = zk.getData(path, true, null);
String nodeData = new String(data,"utf-8");
//System.out.println("获取"+path+"节点数据:"+nodeData);
return nodeData;
} catch (KeeperException | InterruptedException | UnsupportedEncodingException e) {
e.printStackTrace();
return null;
}
}
/\*\*
\* 修改节点数据
\* @param path 节点path
\* @param newData 节点新数据
\* @return
\*/
public boolean writeNode(String path,String newData){
try {
Stat stat = zk.setData(path, newData.getBytes(), -1);
System.out.println("节点\["+path+"\]修改成功");
return true;
} catch (KeeperException|InterruptedException e) {
e.printStackTrace();
}
return false;
}
/\*\*
\* 删除指定节点
\* @param path 节点path
\*/
public void deleteNode(String path){
try {
zk.delete(path,-1);
System.out.println("节点\["+path+"\]删除成功");
} catch (InterruptedException|KeeperException e) {
System.out.println("节点\["+path+"\]删除失败");
e.printStackTrace();
}
}
public static void main(String\[\] args) {
TestJavaApi api = new TestJavaApi();
api.createZkSession(ZK\_SERVER,SESSION\_TIMEOUT);
if(api.createNode(ZK\_PATH,"初始节点内容")){
System.out.println("第一次读"+ZK\_PATH+"节点数据:"+api.readNode(ZK\_PATH));
api.writeNode(ZK\_PATH,"修改ZK\_PATH节点数据");
System.out.println("第二次读"+ZK\_PATH+"节点数据:"+api.readNode(ZK\_PATH));
api.deleteNode(ZK\_PATH);
}
api.releaseSession();
}
}
/**
************输出结果***********
收到事件通知:CONNECTED
zk.getState() = CONNECTED
节点创建成功,node = /zkDir
第一次读/zkDir节点数据:初始节点内容
收到事件通知:CONNECTED
节点\[/zkDir\]修改成功
第二次读/zkDir节点数据:修改ZK\_PATH节点数据
收到事件通知:CONNECTED
节点\[/zkDir\]删除成功
*/
public class ZkWatcher implements Watcher {
private static final String ZK_SERVER = "127.0.0.1:2181";
private static final int SESSION_TIMEOUT = 15000;
private static final String PARENT_PATH ="/testWatcher";
private static final String CHILDREN_PATH = "/testWatcher/children";
private ZooKeeper zk = null;
/*定义原子变量,用于计算进入监听的次数*/
private static AtomicInteger seq = new AtomicInteger();
/*会话进入标志*/
private static final String LOG_PREFIX_OF_MAIN = "【main】";
/\*倒计时器\*/
private CountDownLatch latch = new CountDownLatch(1);
@Override
public void process(WatchedEvent event) {
System.out.println("\*\*\*\*\*\*\*\*\*\*\*\*\*\*进入process方法\*\*\*\*\*\*\*\*\*\*\*\*\*\*");
System.out.println("event = " + event);
/\*模拟业务连接初始化工作\*/
TimeUtils.threadSleep(200);
if (event == null) { return; }
/\*连接状态\*/
Event.KeeperState eventState = event.getState();
/\*事件类型\*/
Event.EventType eventType = event.getType();
/\*受影响的路径\*/
String eventPath = event.getPath();
/\*进入监听标志\*/
String logPreFix = "【watcher-"+seq.incrementAndGet()+"】";
System.out.println(logPreFix + "收到watcher通知");
System.out.println(logPreFix + "连接状态:\\t"+eventState.toString());
System.out.println(logPreFix + "事件类型:\\t"+eventType.toString());
if(Event.KeeperState.SyncConnected == eventState){
if (Event.EventType.None == eventType){/\*成功连接上ZK服务器\*/
System.out.println(logPreFix + "成功连接上ZK服务器");
latch.countDown();
}else if (Event.EventType.NodeCreated == eventType){/\*创建节点\*/
System.out.println(logPreFix + "创建节点");
TimeUtils.threadSleep(100);
/\*使用监听\*/
exist(eventPath,true);
}else if (Event.EventType.NodeChildrenChanged == eventType){
System.out.println(logPreFix + "子节点变更");
TimeUtils.threadSleep(1000);
System.out.println(logPreFix + "子节点列表:" + getChildren(eventPath,true));
}else if (Event.EventType.NodeDataChanged == eventType){
System.out.println(logPreFix + "修改节点数据");
TimeUtils.threadSleep(100);
System.out.println(logPreFix + "修改后节点内容:" + readNode(eventPath, true));
}else if (Event.EventType.NodeDeleted == eventType){
System.out.println(logPreFix + "删除节点");
System.out.println(logPreFix + "节点 " + eventPath + " 被删除");
}
}else if(Event.KeeperState.Disconnected == eventState){
System.out.println(logPreFix + "与zk服务器断开连接");
}else if(Event.KeeperState.AuthFailed == eventState){
System.out.println(logPreFix + "验证失败");
}else if(Event.KeeperState.Expired == eventState){
System.out.println(logPreFix + "会话超时");
}
System.out.println("----------------------------------------");
}
/\*\*
\* 创建ZK连接
\* @param connectAddr ZK服务器地址列表
\* @param sessionTimeout Session超时时间
\*/
public void createConnection(String connectAddr, int sessionTimeout) {
this.releaseConnection();
try {
zk = new ZooKeeper(connectAddr, sessionTimeout, this);
System.out.println(LOG\_PREFIX\_OF\_MAIN + "开始连接zk服务器");
latch.await();
} catch (Exception e) {
e.printStackTrace();
}
}
/\*\*
\* 关闭ZK连接
\*/
public void releaseConnection() {
if (this.zk != null) {
try {
this.zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/\*\*
\* 创建节点
\* @param path 节点路径
\* @param data 数据内容
\* @return
\*/
public boolean createPath(String path, String data) {
try {/\*设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)\*/
zk.exists(path, true);
System.out.println(LOG\_PREFIX\_OF\_MAIN + "节点创建成功, Path: " +
this.zk.create( /\*路径\*/
path,/\*数据\*/
data.getBytes(),/\*所有可见\*/
ZooDefs.Ids.OPEN\_ACL\_UNSAFE,/\*永久存储\*/
CreateMode.PERSISTENT ) +
", content: " + data);
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
/\*\*
\* 删除所有节点
\*/
public void deleteAllTestPath() {
if(this.exist(CHILDREN\_PATH, false) != null){
this.deleteNode(CHILDREN\_PATH);
}
if(this.exist(PARENT\_PATH, false) != null){
this.deleteNode(PARENT\_PATH);
}
}
/\*\*
\* 删除指定节点
\* @param path
\*/
public void deleteNode(String path) {
try {
zk.delete(path,-1);
System.out.println(LOG\_PREFIX\_OF\_MAIN + "删除节点成功,path:" + path);
} catch (InterruptedException|KeeperException e) {
e.printStackTrace();
}
}
/\*\*
\* 获取节点内容
\* @param path
\* @param needWatch
\* @return
\*/
public String readNode(String path, boolean needWatch) {
try {
byte\[\] data = zk.getData(path, needWatch, null);
return new String(data,"utf-8");
} catch (KeeperException|InterruptedException|UnsupportedEncodingException e) {
e.printStackTrace();
return null;
}
}
/\*\*
\* 获取指定节点的子节点列表
\* @param path
\* @param needWatch
\* @return
\*/
public List<String> getChildren(String path, boolean needWatch) {
try {
return this.zk.getChildren(path, needWatch);
} catch (KeeperException|InterruptedException e) {
e.printStackTrace();
return null;
}
}
/\*\*
\* 更新指定节点数据内容
\* @param path 节点路径
\* @param data 数据内容
\* @return
\*/
public boolean writeNode(String path, String data) {
try {
System.out.println(LOG\_PREFIX\_OF\_MAIN + "更新数据成功,path:" + path + ", stat: " +
this.zk.setData(path, data.getBytes(), -1));
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/\*\*
\* path节点是否存在
\* @param path
\* @param needWatch
\* @return
\*/
public Stat exist(String path, boolean needWatch) {
try {
return zk.exists(path,needWatch);
} catch (KeeperException|InterruptedException e) {
e.printStackTrace();
return null;
}
}
public static void main(String\[\] args) throws Exception {
//建立watcher
ZkWatcher watcher = new ZkWatcher();
//创建连接
watcher.createConnection(ZK\_SERVER, SESSION\_TIMEOUT);
//System.out.println(zkWatch.zk.toString());
Thread.sleep(1000);
// 清理节点
watcher.deleteAllTestPath();
if (watcher.createPath(PARENT\_PATH, System.currentTimeMillis() + "")) {
System.out.println("---------------------- read parent ----------------------------");
/\*
读取数据,在操作节点数据之前先调用zookeeper的getData()方法是为了可以watch到对节点的操作。
watch是一次性的,也就是说,如果第二次又重新调用了setData()方法,在此之前需要重新调用一次。
\*/
watcher.readNode(PARENT\_PATH, true);
watcher.writeNode(PARENT\_PATH, System.currentTimeMillis() + "");
System.out.println("---------------------- read children path ----------------------------");
/\*
读取子节点,设置对子节点变化的watch,如果不写该方法,则在创建子节点是只会输出NodeCreated,
而不会输出NodeChildrenChanged,也就是说创建子节点时没有watch。
如果是递归的创建子节点,如path="/p/c1/c2"的话,getChildren(PARENT\_PATH, ture)只会在
创建c1时watch,输出c1的NodeChildrenChanged,而不会输出创建c2时的NodeChildrenChanged,
如果watch到c2的NodeChildrenChanged,则需要再调用一次getChildren(String path, true)方法,
其中path="/p/c1"
\*/
watcher.getChildren(PARENT\_PATH, true);
Thread.sleep(1000);
// 创建子节点,同理如果想要watch到NodeChildrenChanged状态,需要调用getChildren(CHILDREN\_PATH, true)
watcher.createPath(CHILDREN\_PATH, System.currentTimeMillis() + "");
Thread.sleep(1000);
watcher.readNode(CHILDREN\_PATH, true);
watcher.writeNode(CHILDREN\_PATH, System.currentTimeMillis() + "");
}
Thread.sleep(20000);
// 清理节点
watcher.deleteAllTestPath();
Thread.sleep(1000);
watcher.releaseConnection();
}
}
class TimeUtils{
public static void threadSleep(long mills){
try {
Thread.sleep(mills);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/*
*********输出结果********
【main】开始连接zk服务器
**************进入process方法**************
event = WatchedEvent state:SyncConnected type:None path:null
【watcher-1】收到watcher通知
【watcher-1】连接状态: SyncConnected
【watcher-1】事件类型: None
【watcher-1】成功连接上ZK服务器
**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeCreated path:/testWatcher
【main】节点创建成功, Path: /testWatcher, content: 1567510219582
---------------------- read parent ----------------------------
【main】更新数据成功,path:/testWatcher, stat: 223,224,1567510219588,1567510219598,1,0,0,0,13,0,223
---------------------- read children path ----------------------------
【watcher-2】收到watcher通知
【watcher-2】连接状态: SyncConnected
【watcher-2】事件类型: NodeCreated
【watcher-2】创建节点
**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/testWatcher
【watcher-3】收到watcher通知
【watcher-3】连接状态: SyncConnected
【watcher-3】事件类型: NodeDataChanged
【watcher-3】修改节点数据
【watcher-3】修改后节点内容:1567510219598
**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeCreated path:/testWatcher/children
【main】节点创建成功, Path: /testWatcher/children, content: 1567510220605
【watcher-4】收到watcher通知
【watcher-4】连接状态: SyncConnected
【watcher-4】事件类型: NodeCreated
【watcher-4】创建节点
**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/testWatcher
【watcher-5】收到watcher通知
【watcher-5】连接状态: SyncConnected
【watcher-5】事件类型: NodeChildrenChanged
【watcher-5】子节点变更
【main】更新数据成功,path:/testWatcher/children, stat: 225,226,1567510220606,1567510221615,1,0,0,0,13,0,225
【watcher-5】子节点列表:[children]
**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/testWatcher/children
【watcher-6】收到watcher通知
【watcher-6】连接状态: SyncConnected
【watcher-6】事件类型: NodeDataChanged
【watcher-6】修改节点数据
【watcher-6】修改后节点内容:1567510221615
**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeDeleted path:/testWatcher/children
【main】删除节点成功,path:/testWatcher/children
【main】删除节点成功,path:/testWatcher
【watcher-7】收到watcher通知
【watcher-7】连接状态: SyncConnected
【watcher-7】事件类型: NodeDeleted
【watcher-7】删除节点
【watcher-7】节点 /testWatcher/children 被删除
**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/testWatcher
【watcher-8】收到watcher通知
【watcher-8】连接状态: SyncConnected
【watcher-8】事件类型: NodeChildrenChanged
【watcher-8】子节点变更
*/
手机扫一扫
移动阅读更方便
你可能感兴趣的文章