六、Zookeeper-开源客户端ZkClient与Curator
阅读原文时间:2023年07月08日阅读:1

从创建会话、创建节点、读取数据、更新数据、删除节点拉介绍ZkClient

添加依赖:

pom.xml

com.101tec zkclient 0.2

创建会话,连接服务端:

public class CreateSession {
/*
zkClient连接服务端
zkClient通过对zookeeperAPI的内部包装,将这个会话创建的过程同步化了
*/
public static void main(String[] args) {
ZkClient zkClient = new ZkClient("127.0.0.1:2181");
System.out.println("ZooKeeper session established.");
}
}

创建节点:

ZkClient提供了递归创建节点的接口、先创建父节点,在创建子节点。

public class Create_Node_Sample {
public static void main(String[] args) {
ZkClient zkClient = new ZkClient("127.0.0.1:2181");
System.out.println("ZooKeeper session established.");
//createParent设置为true,可递归创建
zkClient.createPersistent("/g-zkClient/g-c1",true);
System.out.println("success create znode.");
}

删除节点:

ZkClient提供了递归删除节点,先删除子节点,再删除父节点。

public class Del_Data_Sample {
public static void main(String[] args) throws Exception {
String path = "/g-zkClient/g-c1";
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
zkClient.deleteRecursive(path);
System.out.println("success delete znode.");
}
}

获取子节点:

/*
借助zkclient完成会话的创建
*/
public static void main(String[] args) throws InterruptedException {

    /\*  
        创建一个zkclient实例就可以完成连接,完成会话的创建  
        serverString : 服务器连接地址

        注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了..  
     \*/

    ZkClient zkClient = new ZkClient("127.0.0.1:2181");  
    System.out.println("会话被创建了..");

    // 获取子节点列表  
    List<String> children = zkClient.getChildren("/g-zkclient");  
    System.out.println(children);

    // 注册监听事件

    /\*  
        客户端可以对一个不存在的节点进行子节点变更的监听  
        只要该节点的子节点列表发生变化,或者该节点本身被创建或者删除,都会触发监听  
     \*/  
    zkClient.subscribeChildChanges("/g-zkclient-get", new IZkChildListener() {

        /\*  
            s : parentPath  
            list : 变化后子节点列表  
         \*/

        public void handleChildChange(String parentPath, List<String> list) throws Exception {  
            System.out.println(parentPath + "的子节点列表发生了变化,变化后的子节点列表为"+ list);

        }  
    });

    //测试  
    zkClient.createPersistent("/g-zkclient-get");  
    Thread.sleep(1000);

    zkClient.createPersistent("/g-zkclient-get/c1");  
    Thread.sleep(1000);

}

获取数据:

/*
借助zkclient完成会话的创建
*/
public static void main(String[] args) throws InterruptedException {

    /\*  
        创建一个zkclient实例就可以完成连接,完成会话的创建  
        serverString : 服务器连接地址

        注意:zkClient通过对zookeeperAPI内部封装,将这个异步创建会话的过程同步化了..  
     \*/

    ZkClient zkClient = new ZkClient("127.0.0.1:2181");  
    System.out.println("会话被创建了..");

    // 判断节点是否存在  
    String path = "/g-zkClient-Ep";  
    boolean exists = zkClient.exists(path);

    if(!exists){  
        // 创建临时节点  
        zkClient.createEphemeral(path,"123");  
    }

    // 读取节点内容  
    Object o = zkClient.readData(path);  
    System.out.println(o);

    // 注册监听  
    zkClient.subscribeDataChanges(path, new IZkDataListener() {

        /\*  
            当节点数据内容发生变化时,执行的回调方法  
            s: path  
            o: 变化后的节点内容  
         \*/  
        public void handleDataChange(String s, Object o) throws Exception {  
            System.out.println(s+"该节点内容被更新,更新的内容"+o);  
        }

        /\*  
            当节点被删除时,会执行的回调方法  
            s : path  
         \*/  
        public void handleDataDeleted(String s) throws Exception {  
            System.out.println(s+"该节点被删除");  
        }  
    });

    // 更新节点内容  
    zkClient.writeData(path,"456");  
    Thread.sleep(1000);

    // 删除节点  
    zkClient.delete(path);  
    Thread.sleep(1000);

}

Curator 是Netfilx公司开源的一套Zookeeper客户端框架,和ZkClient一样。解决了很多Zookeeper客户端底层非常细节的开发工作。

连接重连、反复注册Watcher、NodeExistsException基于Fluent编程风格。

添加依赖:

org.apache.curator curator-framework 2.12.0

创建会话:

Curator创建会话,是是通过CuratorFrameworkFactory的工厂类来实现的。

public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)

参数 RetryPolicy重试策略接口、ExponentialBackoffRetry (基于 Backoff 重连策略)、RetryNTimes(重连N次策略)、RetryForever (永久重连)

通过调用CuratorFramework中的 start()方法,来启动会话。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client =CuratorFrameworkFactory.newClient("127.0.0.1:2181",retryPolicy);
client.start();

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",
5000,1000,retryPolicy);
client.start();

基于Fluent编程风格

/*
connectString: zk server地址,多个server逗号分隔
connectionTimeoutMs:链接超时时间 默认15s
sessionTimeoutMs:会话超时时间 默认60s
retryPolicy: 失重策略:
ExponentialBackoffRetr:ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

baseSleepTimeMs:重试sleep时间,用于计算每次重试sleep时间 (sleep=baseSleepTimeMs*Math.max(1,__random.nextInt(1<<(retryCount+1)))
maxRetries: 最大重试次数
maxSleepMs:最大__sleep时间

*/

public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient("127.0.0.1:2181", 5000, 3000, retryPolicy);
client.start();

System.out.println("Zookeeper session1 established. ");
CuratorFramework client1 = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181") 
.sessionTimeoutMs(5000) 
.connectionTimeoutMs(3000) 
.retryPolicy(retryPolicy)
.namespace("base") // 独立命名空间/base
.build(); //
client1.start();
System.out.println("Zookeeper session established. ");
}

1、创建节点:

//创建内容为空的节点:
client.create().forPath(path);

//创建内容不为空的节点:
client.create().forPath(path,"内容".getBytes());

//递归创建父节点,并选着节点类型:
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);

2、删除节点

client.delete().forPath(path);//删除一个节点
client.delete().deletingChildrenIfNeeded().forPath(path); // 删除节点并递归删除子节点
client.delete().withVersion(1).forPath(path);//指定版本删除 1表示最新版本
client.delete().guaranteed().forPath(path); //强制删除

3、获取数据

// 普通查询
client.getData().forPath(path);
// 包含状态查询
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);

4、更新数据

// 普通更新
client.setData().forPath(path,"新内容".getBytes());
// 指定版本更新
client.setData().withVersion(1).forPath(path);

//版本不一致报错
org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
BadVersion for

public static void main(String[] args) throws Exception {
String path="/wg_curator/w1";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//使用Fluent编程风格 namespace 命名空间 /base zookeeper 业务隔离
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("152.136.193.58:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(retryPolicy)
.namespace("base")
.build();
client.start();
//递归创建节点
System.out.println("使用Fluent编程风格 session connection…");
String path1 = client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
System.out.println("session connection… 创建Node sucess.." +path);

    //获取节点数据  
    byte\[\] bytes1 = client.getData().forPath(path);  
    Stat s =new Stat();  
    byte\[\] bytes = client.getData().storingStatIn(s).forPath(path);  
    System.out.println("session connection...  获取Node sucess.." +path);  
    System.out.println("session connection...  获取Node数据内容" +new String(bytes1));  
    System.out.println("session connection...  获取Node Data 包含状态查询" +s);

    //递归删除Node  
    client.delete().deletingChildrenIfNeeded().withVersion(-1).forPath(path);  
    System.out.println("session connection...  递归删除Node sucess.." +path);  
}