安装zookeeper :linux下安装Zookeeper 3.4.14
zookeeper 分为5个包:
org.apache.zookeeper //客户端主要类文件
org.apache.zookeeper.data //
org.apache.zookeeper.server
org.apache.zookeeper.server.quorum
org.apache.zookeeper.server.upgrade
引入依赖:
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class CreateSession implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
/\*
建立会话
\*/
public static void main(String\[\] args) throws IOException, InterruptedException {
/\*
客户端可以通过创建一个zk实例来连接zk服务器
new Zookeeper(connectString,sesssionTimeOut,Wather)
connectString: 连接地址:IP:端口
sesssionTimeOut:会话超时时间:单位毫秒
Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
\*/
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateSession());
System.out.println(zooKeeper.getState());
// 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
countDownLatch.await();
System.out.println("客户端与服务端会话真正建立了");
}
/\*
回调方法:处理来自服务器端的watcher通知
\*/
public void process(WatchedEvent watchedEvent) {
// SyncConnected
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
//解除主程序在CountDownLatch上的等待阻塞
System.out.println("process方法执行了...");
countDownLatch.countDown();
}
}
}
注意:ZooKeeper客户端和服务端会话的建立是一个异步的过程,也就是说在程序中,构造方法会在处理完客户端初始化工作后立即返回,在大多数情况下,此时并没有真正建立好一个可用的会话,在会话的生命周期中处于“ CONNECTING的状态。当该会话真正创建完毕后 ZooKeeper服务端会向会话对应的客户端发送一个事件通知,以告知客户端,客户端只有在获取这个通知之后,才算真正建立了会话
节点介绍:
/**
* path :节点创建的路径
* data[] :节点创建要保存的数据,是个byte类型的
* acl :节点创建的权限信息(4种类型)
* ANYONE_ID_UNSAFE : 表示任何人
* AUTH_IDS :此ID仅可用于设置ACL。它将被客户机验证的ID替换。
* OPEN_ACL_UNSAFE :这是一个完全开放的ACL(常用)--> world:anyone
* CREATOR_ALL_ACL :此ACL授予创建者身份验证ID的所有权限
* createMode :创建节点的类型(4种类型)
* PERSISTENT:持久节点
* PERSISTENT_SEQUENTIAL:持久顺序节点
* EPHEMERAL:临时节点
* EPHEMERAL_SEQUENTIAL:临时顺序节点
String node = zookeeper.create(path,data,acl,createMode);
*/
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class CreateNote implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
/\*
建立会话
\*/
public static void main(String\[\] args) throws IOException, InterruptedException, KeeperException {
/\*
客户端可以通过创建一个zk实例来连接zk服务器
new Zookeeper(connectString,sesssionTimeOut,Wather)
connectString: 连接地址:IP:端口
sesssionTimeOut:会话超时时间:单位毫秒
Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
\*/
zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new CreateNote());
System.out.println(zooKeeper.getState());
// 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
//countDownLatch.await();\\
Thread.sleep(Integer.MAX\_VALUE);
}
/\*
回调方法:处理来自服务器端的watcher通知
\*/
public void process(WatchedEvent watchedEvent) {
// SyncConnected
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
//解除主程序在CountDownLatch上的等待阻塞
System.out.println("process方法执行了...");
// 创建节点
try {
createNoteSync();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/\*
创建节点的方法
*/
private static void createNoteSync() throws KeeperException, InterruptedException {
/\*\*
\* path :节点创建的路径
\* data\[\] :节点创建要保存的数据,是个byte类型的
\* acl :节点创建的权限信息(4种类型)
\* ANYONE\_ID\_UNSAFE : 表示任何人
\* AUTH\_IDS :此ID仅可用于设置ACL。它将被客户机验证的ID替换。
\* OPEN\_ACL\_UNSAFE :这是一个完全开放的ACL(常用)--> world:anyone
\* CREATOR\_ALL\_ACL :此ACL授予创建者身份验证ID的所有权限
\* createMode :创建节点的类型(4种类型)
\* PERSISTENT:持久节点
\* PERSISTENT\_SEQUENTIAL:持久顺序节点
\* EPHEMERAL:临时节点
\* EPHEMERAL\_SEQUENTIAL:临时顺序节点
String node = zookeeper.create(path,data,acl,createMode);
\*/
// 持久节点
String note\_persistent = zooKeeper.create("/g-persistent", "持久节点内容".getBytes(), ZooDefs.Ids.OPEN\_ACL\_UNSAFE, CreateMode.PERSISTENT);
// 临时节点
String note\_ephemeral = zooKeeper.create("/g-ephemeral", "临时节点内容".getBytes(), ZooDefs.Ids.OPEN\_ACL\_UNSAFE, CreateMode.EPHEMERAL);
// 持久顺序节点
String note\_persistent\_sequential = zooKeeper.create("/g-persistent\_sequential", "持久顺序节点内容".getBytes(), ZooDefs.Ids.OPEN\_ACL\_UNSAFE, CreateMode.PERSISTENT\_SEQUENTIAL);
System.out.println("创建的持久节点" + note\_persistent);
System.out.println("创建的临时节点" + note\_ephemeral);
System.out.println("创建的持久顺序节点" + note\_persistent\_sequential);
}
}
public class GetNoteData implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
/\*
建立会话
\*/
public static void main(String\[\] args) throws IOException, InterruptedException, KeeperException {
/\*
客户端可以通过创建一个zk实例来连接zk服务器
new Zookeeper(connectString,sesssionTimeOut,Wather)
connectString: 连接地址:IP:端口
sesssionTimeOut:会话超时时间:单位毫秒
Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
\*/
zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new GetNoteData());
System.out.println(zooKeeper.getState());
// 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
//countDownLatch.await();\\
Thread.sleep(Integer.MAX\_VALUE);
}
/\*
回调方法:处理来自服务器端的watcher通知
\*/
public void process(WatchedEvent watchedEvent) {
/\*
子节点列表发生改变时,服务器端会发生noteChildrenChanged事件通知
要重新获取子节点列表,同时注意:通知是一次性的,需要反复注册监听
\*/
if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){
List<String> children = null;
try {
children = zooKeeper.getChildren("/g-persistent", true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(children);
}
// SyncConnected
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
//解除主程序在CountDownLatch上的等待阻塞
System.out.println("process方法执行了...");
// 获取节点数据的方法
try {
getNoteData();
// 获取节点的子节点列表方法
getChildrens();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/\*
获取某个节点的内容
\*/
private void getNoteData() throws KeeperException, InterruptedException {
/\*\*
\* path : 获取数据的路径
\* watch : 是否开启监听
\* stat : 节点状态信息
\* null: 表示获取最新版本的数据
\* zk.getData(path, watch, stat);
\*/
byte\[\] data = zooKeeper.getData("/g-persistent", false, null);
System.out.println(new String(data));
}
/\*
获取某个节点的子节点列表方法
\*/
public static void getChildrens() throws KeeperException, InterruptedException {
/\*
path:路径
watch:是否要启动监听,当子节点列表发生变化,会触发监听
zooKeeper.getChildren(path, watch);
\*/
List<String> children = zooKeeper.getChildren("/g-persistent", true);
System.out.println(children);
}
}
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class DeleteNote implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper;
/\*
建立会话
\*/
public static void main(String\[\] args) throws IOException, InterruptedException, KeeperException {
/\*
客户端可以通过创建一个zk实例来连接zk服务器
new Zookeeper(connectString,sesssionTimeOut,Wather)
connectString: 连接地址:IP:端口
sesssionTimeOut:会话超时时间:单位毫秒
Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
\*/
zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new DeleteNote());
System.out.println(zooKeeper.getState());
// 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞
//countDownLatch.await();\\
Thread.sleep(Integer.MAX\_VALUE);
}
/\*
回调方法:处理来自服务器端的watcher通知
\*/
public void process(WatchedEvent watchedEvent) {
// SyncConnected
if(watchedEvent.getState() == Event.KeeperState.SyncConnected){
//解除主程序在CountDownLatch上的等待阻塞
System.out.println("process方法执行了...");
// 删除节点
try {
deleteNoteSync();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/\*
删除节点的方法
\*/
private void deleteNoteSync() throws KeeperException, InterruptedException {
/\*
zooKeeper.exists(path,watch) :判断节点是否存在
zookeeper.delete(path,version) : 删除节点
\*/
Stat stat = zooKeeper.exists("/g-persistent/c1", false);
System.out.println(stat == null ? "该节点不存在":"该节点存在");
if(stat != null){
zooKeeper.delete("/g-persistent/c1",-1);
}
Stat stat2 = zooKeeper.exists("/g-persistent/c1", false);
System.out.println(stat2 == null ? "该节点不存在":"该节点存在");
}
}
/*
更新数据节点内容的方法
*/
private void updateNoteSync() throws KeeperException, InterruptedException {
/\*
path:路径
data:要修改的内容 byte\[\]
version:为-1,表示对最新版本的数据进行修改
zooKeeper.setData(path, data,version);
\*/
byte\[\] data = zooKeeper.getData("/g-persistent", false, null);
System.out.println("修改前的值:" + new String(data));
//修改/lg-persistent 的数据 stat: 状态信息对象
Stat stat = zooKeeper.setData("/g-persistent", "客户端修改了节点数据".getBytes(), -1);
byte\[\] data2 = zooKeeper.getData("/g-persistent", false, null);
System.out.println("修改后的值:" + new String(data2));
}
手机扫一扫
移动阅读更方便
你可能感兴趣的文章