3. ZooKeeper客户端(一)
阅读原文时间:2023年07月13日阅读:1

ZooKeeper常用客户端有三种:原生客户端、zkClient、curator

项目中使用前,需要导入相关依赖


junit junit 4.12
org.apache.zookeeper zookeeper 3.4.12
com.101tec zkclient 0.10

<dependency>  
    <groupId>org.apache.curator</groupId>  
    <artifactId>curator-framework</artifactId>  
    <version>4.0.0</version>  
</dependency>  
<dependency>  
    <groupId>org.apache.curator</groupId>  
    <artifactId>curator-recipes</artifactId>  
    <version>4.0.0</version>  
</dependency>  

创建会话

不使用监听

public class TestCreateSession {
/*服务地址*/
private static final String ZK_SERVER = "127.0.0.1:2181";
@Test
public void createSession2() throws IOException {
ZooKeeper zk = new ZooKeeper(ZK_SERVER, 50000, null);
System.out.println("zk.getState() = " + zk.getState());
}
}


zk.getState() = CONNECTING 

通过之前的学习可以知道,CONNECTING标志客户端正在连接,并不能确保已经连接上zk服务。可能发生还没有连接到zk服务就进行对zk访问的情况

使用监听

public class TestCreateSession {
/*服务地址*/
private static final String ZK_SERVER = "127.0.0.1:2181";
/*倒计时器*/
private CountDownLatch latch = new CountDownLatch(1);
@Test
public void createSession() throws IOException, InterruptedException {
ZooKeeper zk = new ZooKeeper(ZK_SERVER, 50000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected){/*确保zk已连接*/
latch.countDown();
}
}
});
latch.await();
System.out.println("zk.getState() = " + zk.getState());
}
}


zk.getState() = CONNECTED

使用监听机制可以确保在ZooKeeper初始化完成前进行等待,初始化完成再进行后续操作

客户端基本操作

public class TestJavaApi implements Watcher {
/*zk服务地址*/
private static final String ZK_SERVER = "127.0.0.1:2181";
/*会话连接超时时间*/
private static final int SESSION_TIMEOUT = 50000;
/*指定目录【节点】*/
private static final String ZK_PATH = "/zkDir";
/*客户端连接会话*/
private ZooKeeper zk = null;

 /\*倒计时器\*/  
 private CountDownLatch latch = new CountDownLatch(1);  
 /\*\*  
  \* 事件被触发时的动作  
  \* @param event 事件  
  \*/  
 @Override  
 public void process(WatchedEvent event) {  
     System.out.println("收到事件通知:" + zk.getState() +"\\n");  
     if (event.getState() == Event.KeeperState.SyncConnected){  
         latch.countDown();  
     }  
 }

 /\*\*  
  \* 创建zk会话连接  
  \* @param connectString     zk服务器地址列表,可以是"地址1,地址2,...."  
  \* @param sessionTimeout    Session超时时间  
  \*/  
 public void createZkSession(String connectString, int sessionTimeout){  
     try {  
         zk = new ZooKeeper(connectString,sessionTimeout,this);  
         latch.await();  
         System.out.println("zk.getState() = " + zk.getState());  
     } catch (IOException|InterruptedException e) {  
         System.out.println("连接创建失败");  
         e.printStackTrace();  
     }  
 }

 /\*\*  
  \* 关闭zk会话  
  \*/  
 public void releaseSession(){  
     try {  
         zk.close();  
     } catch (InterruptedException e) {  
         e.printStackTrace();  
     }  
 }

 /\*\*  
  \* 创建节点【目录、文件】  
  \* @param path  节点  
  \* @param data  节点数据  
  \* @return  
  \*/  
 public boolean createNode(String path,String data){  
     try {  
         String node = zk.create(path/\*节点path\*/,  
                 data.getBytes()/\*节点数据\*/,  
                 ZooDefs.Ids.OPEN\_ACL\_UNSAFE/\*权限控制  OPEN\_ACL\_UNSAFE相当于world:anyone\*/,  
                 CreateMode.EPHEMERAL)/\*临时节点\*/;  
         System.out.println("节点创建成功,node = " + node);  
         return true;  
     } catch (KeeperException|InterruptedException e) {  
         System.out.println("节点创建失败");  
         e.printStackTrace();  
     }  
     return false;  
 }

 /\*\*  
  \* 获取节点数据  
  \* @param path  节点路径  
  \* @return  
  \*/  
 public String readNode(String path){  
     try {  
         byte\[\] data = zk.getData(path, true, null);  
         String nodeData = new String(data,"utf-8");  
         //System.out.println("获取"+path+"节点数据:"+nodeData);  
         return nodeData;  
     } catch (KeeperException | InterruptedException | UnsupportedEncodingException e) {  
         e.printStackTrace();  
         return null;  
     }  
 }

 /\*\*  
  \* 修改节点数据  
  \* @param path      节点path  
  \* @param newData   节点新数据  
  \* @return  
  \*/  
 public boolean writeNode(String path,String newData){  
     try {  
         Stat stat = zk.setData(path, newData.getBytes(), -1);  
         System.out.println("节点\["+path+"\]修改成功");  
         return true;  
     } catch (KeeperException|InterruptedException e) {  
         e.printStackTrace();  
     }  
     return false;  
 }

 /\*\*  
  \* 删除指定节点  
  \* @param path  节点path  
  \*/  
 public void deleteNode(String path){  
     try {  
         zk.delete(path,-1);  
         System.out.println("节点\["+path+"\]删除成功");  
     } catch (InterruptedException|KeeperException e) {  
         System.out.println("节点\["+path+"\]删除失败");  
         e.printStackTrace();  
     }  
 }

 public static void main(String\[\] args) {  
     TestJavaApi api = new TestJavaApi();  
     api.createZkSession(ZK\_SERVER,SESSION\_TIMEOUT);  
     if(api.createNode(ZK\_PATH,"初始节点内容")){  
         System.out.println("第一次读"+ZK\_PATH+"节点数据:"+api.readNode(ZK\_PATH));  
         api.writeNode(ZK\_PATH,"修改ZK\_PATH节点数据");  
         System.out.println("第二次读"+ZK\_PATH+"节点数据:"+api.readNode(ZK\_PATH));  
         api.deleteNode(ZK\_PATH);  
     }  
     api.releaseSession();  
 }  

}
/**
************输出结果***********
收到事件通知:CONNECTED

 zk.getState() = CONNECTED  
 节点创建成功,node = /zkDir  
 第一次读/zkDir节点数据:初始节点内容  
 收到事件通知:CONNECTED

 节点\[/zkDir\]修改成功  
 第二次读/zkDir节点数据:修改ZK\_PATH节点数据  
 收到事件通知:CONNECTED

 节点\[/zkDir\]删除成功  

*/

watch机制

public class ZkWatcher implements Watcher {
private static final String ZK_SERVER = "127.0.0.1:2181";
private static final int SESSION_TIMEOUT = 15000;
private static final String PARENT_PATH ="/testWatcher";
private static final String CHILDREN_PATH = "/testWatcher/children";
private ZooKeeper zk = null;
/*定义原子变量,用于计算进入监听的次数*/
private static AtomicInteger seq = new AtomicInteger();
/*会话进入标志*/
private static final String LOG_PREFIX_OF_MAIN = "【main】";

 /\*倒计时器\*/  
 private CountDownLatch latch = new CountDownLatch(1);  
 @Override  
 public void process(WatchedEvent event) {  
     System.out.println("\*\*\*\*\*\*\*\*\*\*\*\*\*\*进入process方法\*\*\*\*\*\*\*\*\*\*\*\*\*\*");  
     System.out.println("event = " + event);  
     /\*模拟业务连接初始化工作\*/  
     TimeUtils.threadSleep(200);  
     if (event == null) { return; }  
     /\*连接状态\*/  
     Event.KeeperState eventState = event.getState();  
     /\*事件类型\*/  
     Event.EventType eventType = event.getType();  
     /\*受影响的路径\*/  
     String eventPath = event.getPath();  
     /\*进入监听标志\*/  
     String logPreFix = "【watcher-"+seq.incrementAndGet()+"】";  
     System.out.println(logPreFix + "收到watcher通知");  
     System.out.println(logPreFix + "连接状态:\\t"+eventState.toString());  
     System.out.println(logPreFix + "事件类型:\\t"+eventType.toString());

     if(Event.KeeperState.SyncConnected == eventState){  
         if (Event.EventType.None == eventType){/\*成功连接上ZK服务器\*/  
             System.out.println(logPreFix + "成功连接上ZK服务器");  
             latch.countDown();  
         }else if (Event.EventType.NodeCreated == eventType){/\*创建节点\*/  
             System.out.println(logPreFix + "创建节点");  
             TimeUtils.threadSleep(100);  
             /\*使用监听\*/  
             exist(eventPath,true);  
         }else if (Event.EventType.NodeChildrenChanged == eventType){  
             System.out.println(logPreFix + "子节点变更");  
             TimeUtils.threadSleep(1000);  
             System.out.println(logPreFix + "子节点列表:" + getChildren(eventPath,true));  
         }else if (Event.EventType.NodeDataChanged == eventType){  
             System.out.println(logPreFix + "修改节点数据");  
             TimeUtils.threadSleep(100);  
             System.out.println(logPreFix + "修改后节点内容:" + readNode(eventPath, true));  
         }else if (Event.EventType.NodeDeleted == eventType){  
             System.out.println(logPreFix + "删除节点");  
             System.out.println(logPreFix + "节点 " + eventPath + " 被删除");  
         }  
     }else if(Event.KeeperState.Disconnected == eventState){  
         System.out.println(logPreFix + "与zk服务器断开连接");  
     }else if(Event.KeeperState.AuthFailed == eventState){  
         System.out.println(logPreFix + "验证失败");  
     }else if(Event.KeeperState.Expired == eventState){  
         System.out.println(logPreFix + "会话超时");  
     }  
     System.out.println("----------------------------------------");  
 }  
 /\*\*  
  \* 创建ZK连接  
  \* @param connectAddr ZK服务器地址列表  
  \* @param sessionTimeout Session超时时间  
  \*/  
 public void createConnection(String connectAddr, int sessionTimeout) {  
     this.releaseConnection();  
     try {  
         zk = new ZooKeeper(connectAddr, sessionTimeout, this);  
         System.out.println(LOG\_PREFIX\_OF\_MAIN + "开始连接zk服务器");  
         latch.await();  
     } catch (Exception e) {  
         e.printStackTrace();  
     }  
 }

 /\*\*  
  \* 关闭ZK连接  
  \*/  
 public void releaseConnection() {  
     if (this.zk != null) {  
         try {  
             this.zk.close();  
         } catch (InterruptedException e) {  
             e.printStackTrace();  
         }  
     }  
 }

 /\*\*  
  \* 创建节点  
  \* @param path 节点路径  
  \* @param data 数据内容  
  \* @return  
  \*/  
 public boolean createPath(String path, String data) {  
     try {/\*设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)\*/  
         zk.exists(path, true);  
         System.out.println(LOG\_PREFIX\_OF\_MAIN + "节点创建成功, Path: " +  
                 this.zk.create(    /\*路径\*/  
                         path,/\*数据\*/  
                         data.getBytes(),/\*所有可见\*/  
                         ZooDefs.Ids.OPEN\_ACL\_UNSAFE,/\*永久存储\*/  
                         CreateMode.PERSISTENT ) +  
                 ", content: " + data);  
     } catch (Exception e) {  
         e.printStackTrace();  
         return false;  
     }  
     return true;  
 }

 /\*\*  
  \* 删除所有节点  
  \*/  
 public void deleteAllTestPath() {  
     if(this.exist(CHILDREN\_PATH, false) != null){  
         this.deleteNode(CHILDREN\_PATH);  
     }  
     if(this.exist(PARENT\_PATH, false) != null){  
         this.deleteNode(PARENT\_PATH);  
     }  
 }

 /\*\*  
  \* 删除指定节点  
  \* @param path  
  \*/  
 public void deleteNode(String path) {  
     try {  
         zk.delete(path,-1);  
         System.out.println(LOG\_PREFIX\_OF\_MAIN + "删除节点成功,path:" + path);  
     } catch (InterruptedException|KeeperException e) {  
         e.printStackTrace();  
     }  
 }

 /\*\*  
  \* 获取节点内容  
  \* @param path  
  \* @param needWatch  
  \* @return  
  \*/  
 public String readNode(String path, boolean needWatch) {  
     try {  
         byte\[\] data = zk.getData(path, needWatch, null);  
         return new String(data,"utf-8");  
     } catch (KeeperException|InterruptedException|UnsupportedEncodingException e) {  
         e.printStackTrace();  
         return null;  
     }  
 }

 /\*\*  
  \* 获取指定节点的子节点列表  
  \* @param path  
  \* @param needWatch  
  \* @return  
  \*/  
 public List<String> getChildren(String path, boolean needWatch) {  
     try {  
         return this.zk.getChildren(path, needWatch);  
     } catch (KeeperException|InterruptedException e) {  
         e.printStackTrace();  
         return null;  
     }  
 }  
 /\*\*  
  \* 更新指定节点数据内容  
  \* @param path 节点路径  
  \* @param data 数据内容  
  \* @return  
  \*/  
 public boolean writeNode(String path, String data) {  
     try {  
         System.out.println(LOG\_PREFIX\_OF\_MAIN + "更新数据成功,path:" + path + ", stat: " +  
                 this.zk.setData(path, data.getBytes(), -1));  
     } catch (Exception e) {  
         e.printStackTrace();  
     }  
     return false;  
 }  
 /\*\*  
  \* path节点是否存在  
  \* @param path  
  \* @param needWatch  
  \* @return  
  \*/  
 public Stat exist(String path, boolean needWatch) {  
     try {  
         return zk.exists(path,needWatch);  
     } catch (KeeperException|InterruptedException e) {  
         e.printStackTrace();  
         return null;  
     }  
 }

 public static void main(String\[\] args) throws Exception {  
     //建立watcher  
     ZkWatcher watcher = new ZkWatcher();  
     //创建连接  
     watcher.createConnection(ZK\_SERVER, SESSION\_TIMEOUT);  
     //System.out.println(zkWatch.zk.toString());  
     Thread.sleep(1000);  
     // 清理节点  
     watcher.deleteAllTestPath();  
     if (watcher.createPath(PARENT\_PATH, System.currentTimeMillis() + "")) {  
         System.out.println("---------------------- read parent ----------------------------");  
         /\*  
         读取数据,在操作节点数据之前先调用zookeeper的getData()方法是为了可以watch到对节点的操作。  
         watch是一次性的,也就是说,如果第二次又重新调用了setData()方法,在此之前需要重新调用一次。  
         \*/  
         watcher.readNode(PARENT\_PATH, true);  
         watcher.writeNode(PARENT\_PATH, System.currentTimeMillis() + "");  
         System.out.println("---------------------- read children path ----------------------------");  
         /\*  
         读取子节点,设置对子节点变化的watch,如果不写该方法,则在创建子节点是只会输出NodeCreated,  
         而不会输出NodeChildrenChanged,也就是说创建子节点时没有watch。  
         如果是递归的创建子节点,如path="/p/c1/c2"的话,getChildren(PARENT\_PATH, ture)只会在  
         创建c1时watch,输出c1的NodeChildrenChanged,而不会输出创建c2时的NodeChildrenChanged,  
         如果watch到c2的NodeChildrenChanged,则需要再调用一次getChildren(String path, true)方法,  
         其中path="/p/c1"  
          \*/  
         watcher.getChildren(PARENT\_PATH, true);  
         Thread.sleep(1000);  
         // 创建子节点,同理如果想要watch到NodeChildrenChanged状态,需要调用getChildren(CHILDREN\_PATH, true)  
         watcher.createPath(CHILDREN\_PATH, System.currentTimeMillis() + "");  
         Thread.sleep(1000);  
         watcher.readNode(CHILDREN\_PATH, true);  
         watcher.writeNode(CHILDREN\_PATH, System.currentTimeMillis() + "");  
     }  
     Thread.sleep(20000);  
     // 清理节点  
     watcher.deleteAllTestPath();  
     Thread.sleep(1000);  
     watcher.releaseConnection();  
 }  

}

class TimeUtils{
public static void threadSleep(long mills){
try {
Thread.sleep(mills);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

/*
*********输出结果********
【main】开始连接zk服务器
**************进入process方法**************
event = WatchedEvent state:SyncConnected type:None path:null
【watcher-1】收到watcher通知
【watcher-1】连接状态: SyncConnected
【watcher-1】事件类型: None
【watcher-1】成功连接上ZK服务器


**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeCreated path:/testWatcher
【main】节点创建成功, Path: /testWatcher, content: 1567510219582
---------------------- read parent ----------------------------
【main】更新数据成功,path:/testWatcher, stat: 223,224,1567510219588,1567510219598,1,0,0,0,13,0,223

---------------------- read children path ----------------------------
【watcher-2】收到watcher通知
【watcher-2】连接状态: SyncConnected
【watcher-2】事件类型: NodeCreated
【watcher-2】创建节点


**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/testWatcher
【watcher-3】收到watcher通知
【watcher-3】连接状态: SyncConnected
【watcher-3】事件类型: NodeDataChanged
【watcher-3】修改节点数据
【watcher-3】修改后节点内容:1567510219598


**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeCreated path:/testWatcher/children
【main】节点创建成功, Path: /testWatcher/children, content: 1567510220605
【watcher-4】收到watcher通知
【watcher-4】连接状态: SyncConnected
【watcher-4】事件类型: NodeCreated
【watcher-4】创建节点


**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/testWatcher
【watcher-5】收到watcher通知
【watcher-5】连接状态: SyncConnected
【watcher-5】事件类型: NodeChildrenChanged
【watcher-5】子节点变更
【main】更新数据成功,path:/testWatcher/children, stat: 225,226,1567510220606,1567510221615,1,0,0,0,13,0,225

【watcher-5】子节点列表:[children]


**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/testWatcher/children
【watcher-6】收到watcher通知
【watcher-6】连接状态: SyncConnected
【watcher-6】事件类型: NodeDataChanged
【watcher-6】修改节点数据
【watcher-6】修改后节点内容:1567510221615


**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeDeleted path:/testWatcher/children
【main】删除节点成功,path:/testWatcher/children
【main】删除节点成功,path:/testWatcher
【watcher-7】收到watcher通知
【watcher-7】连接状态: SyncConnected
【watcher-7】事件类型: NodeDeleted
【watcher-7】删除节点
【watcher-7】节点 /testWatcher/children 被删除


**************进入process方法**************
event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/testWatcher
【watcher-8】收到watcher通知
【watcher-8】连接状态: SyncConnected
【watcher-8】事件类型: NodeChildrenChanged
【watcher-8】子节点变更

*/

ZooKeeper认证机制

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章