赞
踩
1.把zookeeper压缩包拷贝到3台电脑(1、3、5……台也行)
2.配置conf/zoo.cfg:
3.分别在3台电脑的${dataDir}目录下创建名为myid的文件:
/zookeeperdata/myid
server1机器的内容为1,
server2机器的内容为2,
server3机器的内容为3。
zkServer命令:
zkClient命令:
执行./zkCli.sh进入客户端界面。
或者执行./zkCli.sh [-server 127.0.0.1:2181]。
注意:
删除节点有2个命令:delete(删除的节点没有子节点)和rmr(删除的节点可以有子节点)。
package cn.sky.zookeepertest;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class Main {
public static void main(String[] args) throws Exception {
String connectString = "127.0.0.1:2181";
//伪集群
// String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
CountDownLatch cdl = new CountDownLatch(1);//count=1
ZooKeeper zkCli = new ZooKeeper(connectString, 1000, new Watcher() {
@Override
public void process(WatchedEvent event) {
KeeperState state = event.getState();
if(state.equals(KeeperState.SyncConnected)) {
System.out.println("SyncConnected");
cdl.countDown();//count--
}
}
});
cdl.await();//等待,直到count==0才继续往下执行,表明zookeeper已经连接成功
getChildren(zkCli, "/");
// watchPath(zkCli,"/myzk");
// createPath(zkCli);
// deletePath(zkCli);
zkCli.close();
}
static void getChildren(ZooKeeper zkCli,String path) throws Exception{
List<String> paths = zkCli.getChildren(path,false);
System.out.println(paths);
}
static void deletePath(ZooKeeper zkCli) throws Exception {
zkCli.delete("/myzk", -1);
}
static void watchPath(ZooKeeper zkCli,String path) throws Exception {
zkCli.exists(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
EventType type = event.getType();
System.out.println(path+"==>"+type.name());
}
});
}
static void createPath(ZooKeeper zkCli) throws Exception {
Stat stat = zkCli.exists("/myzk", false);
if(stat!=null) {
System.out.println("/myzk节点已经存在,不能重复创建该节点");
}else {
System.out.println("/myzk节点不存在,现在开始创建/myzk节点");
zkCli.create("/myzk", "hello".getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
}
}

package cn.sky.zookeepertest;
import java.net.URLDecoder;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class Main {
private static CuratorFramework client;
private static String connectString = "127.0.0.1:2181";
private static InterProcessLock lock1;
private static InterProcessLock lock2;
private static String lockpath = "/mylock";
static {
client = CuratorFrameworkFactory.newClient(connectString, new ExponentialBackoffRetry(1000, 3));
lock1 = new InterProcessMutex(client, lockpath);
lock2 = new InterProcessMutex(client, lockpath);
}
public static void main(String[] args) throws Exception {
client.start();
// watchConnectionState();
// getChildren("/");
// watchNode("/");
// createNode("/myzk/a");
// Thread.sleep(1000);
// System.out.println("================");
// printZK("/","");
// testLock();
}
private static void doSomething() throws Exception {
System.out.println(Thread.currentThread().getName()+" begin.");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName()+" end.");
}
/**
* 测试分布式锁
* 用多线程模拟分布式环境
*/
static void testLock() {
int getLockTimeout = 6;
Thread t1 = new Thread() {
public void run() {
try {
if(lock1.acquire(getLockTimeout,TimeUnit.SECONDS)) {
doSomething();
};
} catch (Exception e) {
e.printStackTrace();
}finally {
if(lock1.isAcquiredInThisProcess()) {
try {
lock1.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
};
};
Thread t2 = new Thread() {
public void run() {
try {
if(lock2.acquire(getLockTimeout,TimeUnit.SECONDS)) {
doSomething();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if(lock2.isAcquiredInThisProcess()) {
try {
lock2.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
};
};
t1.start();
t2.start();
}
/**
* 打印path下的子节点
* @param path
* @param prex
* @throws Exception
*/
static void printZK(String path,String prex) throws Exception {
System.out.println(prex+URLDecoder.decode(path.substring(path.lastIndexOf("/"), path.length()), "UTF-8"));
if(client.checkExists().forPath(path)!=null) {
List<String> cs = client.getChildren().forPath(path);
for (String s : cs) {
if(path.equals("/")) {
printZK(path+s,prex+" ");
}else {
printZK(path+"/"+s,prex+" ");
}
}
}
}
static class MyPathChildrenCacheListener implements PathChildrenCacheListener{
private String parentPath;
public MyPathChildrenCacheListener(String parentPath) {
this.parentPath = parentPath;
}
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
Type type = event.getType();
String childPath = event.getData().getPath();
switch(type) {
case CHILD_ADDED://在父节点(parentPath)下添加子节点(childPath)后触发
System.out.println("child_added:"+childPath);
break;
case CHILD_REMOVED://父节点下的子节点被删除后触发
System.out.println("child_removed:"+childPath);
break;
case CHILD_UPDATED://父节点下的子节点的值被修改后触发
System.out.println("child_updated:"+childPath);
break;
case CONNECTION_LOST:
System.out.println("connection_lost");
break;
case CONNECTION_SUSPENDED:
System.out.println("connection_suspended");
break;
case INITIALIZED:
System.out.println("initialized");
break;
case CONNECTION_RECONNECTED:
System.out.println("connection_reconnected");
break;
default:
System.out.println("default");
break;
}
}
}
static class MyConnectionStateListener implements ConnectionStateListener{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
switch (newState) {
case CONNECTED://第一次连接zkServer成功后触发
System.out.println("connected");
break;
case LOST://关闭zkServer,十几秒(时间不一定)后触发
System.out.println("lost");
break;
case READ_ONLY:
System.out.println("read_only");
break;
case RECONNECTED://关闭zkServer后,再启动zkServer马上触发
System.out.println("reconnected");
break;
case SUSPENDED://关闭zkServer后马上触发
System.out.println("suspended");
break;
default:
System.out.println("default");
break;
}
}
}
/**
* 监听客户端连接状态
*/
static void watchConnectionState() {
client.getConnectionStateListenable().addListener(new MyConnectionStateListener());
}
/**
* 监听path节点下的子节点变化
* @param parentNode 父节点
* @throws Exception
*/
static void watchNode(String parentNode) throws Exception {
PathChildrenCache cache = new PathChildrenCache(client, parentNode, false);
cache.start();//调用该方法之后,path节点会被创建,且为永久节点。
cache.getListenable().addListener(new MyPathChildrenCacheListener(parentNode));
// cache不能关闭
}
/**
* 获取子节点
* @param parentNode
* @throws Exception
*/
static void getChildren(String parentNode) throws Exception {
List<String> nodes = client.getChildren().forPath(parentNode);
System.out.println(nodes);
}
/**
* 创建ZNode节点
* @param node
* @throws Exception
*/
static void createNode(String node) throws Exception {
Stat stat = client.checkExists().forPath(node);
if(stat!=null) {
System.out.println(node+"节点已存在");
}else {
System.out.println(node+"节点不存在,现在创建");
//CreateMode.EPHEMERAL:The znode will be deleted upon the client's disconnect.
client.create().withMode(CreateMode.EPHEMERAL).forPath(node);
//CreateMode.PERSISTENT:The znode will not be automatically deleted upon client's disconnect.
// client.create().withMode(CreateMode.PERSISTENT).forPath(node);
}
}
}

注意点:
1.调用以下方法后
new PathChildrenCache(client, parentPath, false).start();
parentPath会以永久节点的形式被创建。
查看源码的部分调用关系为:
start()-->
start(StartMode.NORMAL)-->
offerOperation(new RefreshOperation(this, RefreshMode.STANDARD))-->
operation.invoke()-->
cache.refresh(mode)-->
ensurePath()-->
ensureContainers.ensure()-->
internalEnsure()-->
client.createContainers(path)-->
checkExists().creatingParentContainersIfNeeded().forPath(ZKPaths.makePath(path, "foo"))
2.org.apache.curator.framework.recipes.cache.PathChildrenCacheListener的public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception 方法:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。