五、Zookeeper基于API操作Node节点
阅读原文时间:2023年07月09日阅读:3

安装zookeeper :linux下安装Zookeeper 3.4.14

zookeeper 分为5个包:

  1. org.apache.zookeeper //客户端主要类文件

  2. org.apache.zookeeper.data //

  3. org.apache.zookeeper.server

  4. org.apache.zookeeper.server.quorum

  5. org.apache.zookeeper.server.upgrade

引入依赖:

org.apache.zookeeper zookeeper 3.4.14

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class CreateSession implements Watcher {

private  static CountDownLatch countDownLatch = new CountDownLatch(1);

/\*  
  建立会话  
 \*/  
public static void main(String\[\] args) throws IOException, InterruptedException {

 /\*  
    客户端可以通过创建一个zk实例来连接zk服务器  
    new Zookeeper(connectString,sesssionTimeOut,Wather)  
    connectString: 连接地址:IP:端口  
    sesssionTimeOut:会话超时时间:单位毫秒  
    Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)  
 \*/

    ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateSession());  
    System.out.println(zooKeeper.getState());

    // 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞  
    countDownLatch.await();

    System.out.println("客户端与服务端会话真正建立了");

}

/\*  
    回调方法:处理来自服务器端的watcher通知  
 \*/  
public void process(WatchedEvent watchedEvent) {  
    // SyncConnected  
    if(watchedEvent.getState() == Event.KeeperState.SyncConnected){

        //解除主程序在CountDownLatch上的等待阻塞  
        System.out.println("process方法执行了...");  
        countDownLatch.countDown();

    }

}  

}

注意:ZooKeeper客户端和服务端会话的建立是一个异步的过程,也就是说在程序中,构造方法会在处理完客户端初始化工作后立即返回,在大多数情况下,此时并没有真正建立好一个可用的会话,在会话的生命周期中处于“ CONNECTING的状态。当该会话真正创建完毕后 ZooKeeper服务端会向会话对应的客户端发送一个事件通知,以告知客户端,客户端只有在获取这个通知之后,才算真正建立了会话

创建节点

节点介绍:

/**
* path :节点创建的路径
* data[] :节点创建要保存的数据,是个byte类型的
* acl :节点创建的权限信息(4种类型)
*         ANYONE_ID_UNSAFE : 表示任何人
*         AUTH_IDS :此ID仅可用于设置ACL。它将被客户机验证的ID替换。
*         OPEN_ACL_UNSAFE :这是一个完全开放的ACL(常用)--> world:anyone
*         CREATOR_ALL_ACL :此ACL授予创建者身份验证ID的所有权限
* createMode :创建节点的类型(4种类型)
*        PERSISTENT:持久节点
*        PERSISTENT_SEQUENTIAL:持久顺序节点
*        EPHEMERAL:临时节点
*        EPHEMERAL_SEQUENTIAL:临时顺序节点
String node = zookeeper.create(path,data,acl,createMode);
*/

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class CreateNote implements Watcher {

private  static CountDownLatch countDownLatch = new CountDownLatch(1);

private static ZooKeeper zooKeeper;

/\*  
  建立会话  
 \*/  
public static void main(String\[\] args) throws IOException, InterruptedException, KeeperException {

 /\*  
    客户端可以通过创建一个zk实例来连接zk服务器  
    new Zookeeper(connectString,sesssionTimeOut,Wather)  
    connectString: 连接地址:IP:端口  
    sesssionTimeOut:会话超时时间:单位毫秒  
    Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)  
 \*/

     zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateNote());  
    System.out.println(zooKeeper.getState());

    // 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞  
    //countDownLatch.await();\\  
    Thread.sleep(Integer.MAX\_VALUE);

}

/\*  
    回调方法:处理来自服务器端的watcher通知  
 \*/  
public void process(WatchedEvent watchedEvent) {  
    // SyncConnected  
    if(watchedEvent.getState() == Event.KeeperState.SyncConnected){

        //解除主程序在CountDownLatch上的等待阻塞  
        System.out.println("process方法执行了...");  
        // 创建节点  
        try {  
            createNoteSync();  
        } catch (KeeperException e) {  
            e.printStackTrace();  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }

    }

}

/\*  
   创建节点的方法  

*/
private static void createNoteSync() throws KeeperException, InterruptedException {

    /\*\*  
     \*  path        :节点创建的路径  
     \*  data\[\]      :节点创建要保存的数据,是个byte类型的  
     \*  acl         :节点创建的权限信息(4种类型)  
     \*                 ANYONE\_ID\_UNSAFE    : 表示任何人  
     \*                 AUTH\_IDS    :此ID仅可用于设置ACL。它将被客户机验证的ID替换。  
     \*                 OPEN\_ACL\_UNSAFE    :这是一个完全开放的ACL(常用)--> world:anyone  
     \*                 CREATOR\_ALL\_ACL  :此ACL授予创建者身份验证ID的所有权限  
     \*  createMode    :创建节点的类型(4种类型)  
     \*                  PERSISTENT:持久节点  
     \*                    PERSISTENT\_SEQUENTIAL:持久顺序节点  
     \*                  EPHEMERAL:临时节点  
     \*                  EPHEMERAL\_SEQUENTIAL:临时顺序节点  
     String node = zookeeper.create(path,data,acl,createMode);  
     \*/

    // 持久节点  
    String note\_persistent = zooKeeper.create("/g-persistent", "持久节点内容".getBytes(), ZooDefs.Ids.OPEN\_ACL\_UNSAFE, CreateMode.PERSISTENT);

    // 临时节点  
    String note\_ephemeral = zooKeeper.create("/g-ephemeral", "临时节点内容".getBytes(), ZooDefs.Ids.OPEN\_ACL\_UNSAFE, CreateMode.EPHEMERAL);

    // 持久顺序节点  
    String note\_persistent\_sequential = zooKeeper.create("/g-persistent\_sequential", "持久顺序节点内容".getBytes(), ZooDefs.Ids.OPEN\_ACL\_UNSAFE, CreateMode.PERSISTENT\_SEQUENTIAL);

    System.out.println("创建的持久节点" + note\_persistent);  
    System.out.println("创建的临时节点" + note\_ephemeral);  
    System.out.println("创建的持久顺序节点" + note\_persistent\_sequential);

}  

}

读取节点:

public class GetNoteData implements Watcher {

private  static CountDownLatch countDownLatch = new CountDownLatch(1);

private static ZooKeeper zooKeeper;

/\*  
  建立会话  
 \*/  
public static void main(String\[\] args) throws IOException, InterruptedException, KeeperException {

 /\*  
    客户端可以通过创建一个zk实例来连接zk服务器  
    new Zookeeper(connectString,sesssionTimeOut,Wather)  
    connectString: 连接地址:IP:端口  
    sesssionTimeOut:会话超时时间:单位毫秒  
    Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)  
 \*/

     zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new GetNoteData());  
    System.out.println(zooKeeper.getState());

    // 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞  
    //countDownLatch.await();\\  
    Thread.sleep(Integer.MAX\_VALUE);

}

/\*  
    回调方法:处理来自服务器端的watcher通知  
 \*/  
public void process(WatchedEvent watchedEvent) {

    /\*  
        子节点列表发生改变时,服务器端会发生noteChildrenChanged事件通知  
        要重新获取子节点列表,同时注意:通知是一次性的,需要反复注册监听  
     \*/  
    if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){

        List<String> children = null;  
        try {  
            children = zooKeeper.getChildren("/g-persistent", true);  
        } catch (KeeperException e) {  
            e.printStackTrace();  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
        System.out.println(children);

    }

    // SyncConnected  
    if(watchedEvent.getState() == Event.KeeperState.SyncConnected){

        //解除主程序在CountDownLatch上的等待阻塞  
        System.out.println("process方法执行了...");

        // 获取节点数据的方法  
        try {  
            getNoteData();

            // 获取节点的子节点列表方法  
            getChildrens();  
        } catch (KeeperException e) {  
            e.printStackTrace();  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }

    }

}

/\*  
    获取某个节点的内容  
 \*/  
private void getNoteData() throws KeeperException, InterruptedException {

    /\*\*  
     \* path    : 获取数据的路径  
     \* watch    : 是否开启监听  
     \* stat    : 节点状态信息  
     \*        null: 表示获取最新版本的数据  
     \*  zk.getData(path, watch, stat);  
     \*/  
    byte\[\] data = zooKeeper.getData("/g-persistent", false, null);  
    System.out.println(new String(data));

}

/\*  
    获取某个节点的子节点列表方法  
 \*/  
public static void getChildrens() throws KeeperException, InterruptedException {

    /\*  
        path:路径  
        watch:是否要启动监听,当子节点列表发生变化,会触发监听  
        zooKeeper.getChildren(path, watch);  
     \*/  
    List<String> children = zooKeeper.getChildren("/g-persistent", true);  
    System.out.println(children);

}

}

删除节点

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class DeleteNote implements Watcher {

private  static CountDownLatch countDownLatch = new CountDownLatch(1);

private static ZooKeeper zooKeeper;

/\*  
  建立会话  
 \*/  
public static void main(String\[\] args) throws IOException, InterruptedException, KeeperException {

 /\*  
    客户端可以通过创建一个zk实例来连接zk服务器  
    new Zookeeper(connectString,sesssionTimeOut,Wather)  
    connectString: 连接地址:IP:端口  
    sesssionTimeOut:会话超时时间:单位毫秒  
    Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)  
 \*/

     zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new DeleteNote());  
    System.out.println(zooKeeper.getState());

    // 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞  
    //countDownLatch.await();\\  
    Thread.sleep(Integer.MAX\_VALUE);

}

/\*  
    回调方法:处理来自服务器端的watcher通知  
 \*/  
public void process(WatchedEvent watchedEvent) {  
    // SyncConnected  
    if(watchedEvent.getState() == Event.KeeperState.SyncConnected){

        //解除主程序在CountDownLatch上的等待阻塞  
        System.out.println("process方法执行了...");  
        // 删除节点  
        try {  
            deleteNoteSync();  
        } catch (KeeperException e) {  
            e.printStackTrace();  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }

    }

}

/\*  
    删除节点的方法  
 \*/  
private void deleteNoteSync() throws KeeperException, InterruptedException {

    /\*  
      zooKeeper.exists(path,watch) :判断节点是否存在  
      zookeeper.delete(path,version) : 删除节点  
  \*/

    Stat stat = zooKeeper.exists("/g-persistent/c1", false);  
    System.out.println(stat == null ? "该节点不存在":"该节点存在");

    if(stat != null){  
        zooKeeper.delete("/g-persistent/c1",-1);  
    }

    Stat stat2 = zooKeeper.exists("/g-persistent/c1", false);  
    System.out.println(stat2 == null ? "该节点不存在":"该节点存在");

}

}

更新节点

/*
更新数据节点内容的方法
*/
private void updateNoteSync() throws KeeperException, InterruptedException {

     /\*  
        path:路径  
        data:要修改的内容 byte\[\]  
        version:为-1,表示对最新版本的数据进行修改  
        zooKeeper.setData(path, data,version);  
     \*/

    byte\[\] data = zooKeeper.getData("/g-persistent", false, null);  
    System.out.println("修改前的值:" + new String(data));

    //修改/lg-persistent 的数据 stat: 状态信息对象  
    Stat stat = zooKeeper.setData("/g-persistent", "客户端修改了节点数据".getBytes(), -1);

    byte\[\] data2 = zooKeeper.getData("/g-persistent", false, null);  
    System.out.println("修改后的值:" + new String(data2));

}