赞
踩
Zookeeper 是一个开源的分布式的,为分布式应用提供协调服务的 Apache 项目。
Zookeeper 从设计模式角度来理解:是一个基于观案者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper 就将负责通知已经在 Zookeeper 上注册的那些观察者做出相应的反应。
1)Zookeeper:一个领导者(Leader) ,多个跟随者(Follower)组成的集群。
2)集群中只要有半数以上节点存活,Zookeeper 集群就能正常服务。
3)全局数据一致:每个 Server 保存一份相同的数据副本,Client 无论连接到哪个 Server,数据都是一致的。
4)更新请求顺序进行,来自同一个 Client 的更新请求按其发送顺序依次执行。
5)数据更新原子性,一次数据更新要么成功,要么失败。
6)实时性,在一定时间范围内,Client 能读到最新数据。
ZooKeeper 数据模型的结构与 Unix 文件系统很类似,整体上可以看作是一棵树,每个节点称做一个 ZNode。每一个 ZNode 默认能够存储 1 MB 的数据,每个 ZNode 都可以通过其路径唯一标识。
提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。
统一命名服务
在分布式环境下,经常需要对应用/服务进行统一命名 ,便于识别。例如:IP 不容易记住,而域名容易记住。
统一配置管理
(1)分布式环境下,配置文件同步非常常见。
① 一般要求一个集群中,所有节点的配置信息是一致的,比如 Kafka 集群。
② 对配置文件修改后,希望能够快速同步到各个节点上。
(2)配置管理可交由 ZooKeeper 实现。
① 可将配置信息写入 ZooKeeper 上的一个 Znode 。
② 各个客户端服务器监听这个 Znode。
③ 一旦 Znode 中的数据被修改,ZooKeeper 将通知各个客户端服务器。
统一集群管理
(1)分布式环境中,实时掌握每个节点的状态是必要的。
可根据节点实时状态做出一些调整。
(2)ZooKeeper 可以实现实时监控节点状态变化
① 可将节点信息写入Z ooKeeper 上的一个 ZNode。
② 监听这个 ZNode 可获取它的实时状态变化。
服务器动态上下线
软负载均衡
在 Zookeeper 中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求。
zookeeper 官网
准备工作
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz -C /usr/local/
mv apache-zookeeper-3.5.6-bin/ zookeeper
mv zoo_sample.cfg zoo.cfg
mkdir -p /usr/local/zookeeper/data
vim zoo.cfg
dataDir=/usr/local/zookeeper/data
vim /etc/profile
在配置文件中添加以下内容
#ZOOKEEPER
export ZOOKEEPER_HOME=/hadoop/zookeeper-3.5.6
export PATH=
P
A
T
H
:
PATH:
PATH:ZOOKEEPER_HOME/bin
source /etc/profile
启动 Zookeeper
zkServer.sh start
启动客户端
zkCli.sh
退出客户端
quit
停止 Zookeeper
zkServer.sh stop
集群规划
在 master、slave1 和 slave2 三个节点上部署 Zookeeper。
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz -C /usr/local
mv apache-zookeeper-3.5.6-bin/ zookeeper
同步 /usr/local/zookeeper 目录内容到 slave1、slave2
xsync zookeeper/
配置服务器编号
① 在 /usr/local/zookeeper/ 这个目录下创建 zkData
mkdir data
② /usr/local/zookeeper/data 目录下创建一个 myid 的文件
touch myid
③ 编辑 myid 文件
vim myid
在文件中添加与 server 对应的编号:
0
④ 分发到其他机器上
xsync myid
并分别在 slave1、slave2 上修改 myid 文件中内容为 1、2
配置 zoo.cfg 文件
① 将 /usr/local/zookeeper/conf 这个路径下的 zoo_sample.cfg 修改为 zoo.cfg
mv zoo_sample.cfg zoo.cfg
② 打开 zoo.cfg 文件,修改 dataDir 路径
dataDir=/usr/local/zookeeper/data
增加如下配置
server.0=master:2888:3888
server.1=slave1:2888:3888
server.2=slave2:2888:3888
同步 zoo.cfg 配置文件
xsync zoo.cfg
修改环境变量
① 打开配置文件
vim /etc/profile
② 在配置文件中添加以下内容
#ZOOKEEPER
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=
P
A
T
H
:
PATH:
PATH:ZOOKEEPER_HOME/bin
③ 同步配置文件
xsync /etc/profile
④ 使配置文件生效(三台机器)
source /etc/profile
集群操作
① 三台机器分别启动 Zookeeper
zkServer.sh start
② 三台机器分别关闭 Zookeeper
zkServer.sh stop
编写 Zookeeper 的群起群关脚本
① 在 /usr/local/bin 目录下创建 zk 文件
vim zk.sh
#!/bin/bash case $1 in "start"){ for i in master slave1 slave2 do echo "****************** $i *********************" ssh $i "source /etc/profile && zkServer.sh start" done };; "stop"){ for i in master slave1 slave2 do echo "****************** $i *********************" ssh $i "source /etc/profile && zkServer.sh stop" done };; esac
修改脚本 zk 具有执行权限
chmod 777 zk.sh
调用脚本形式:zk start 或 zk stop
Zookeeper 中的配置文件 zoo.cfg 中参数含义解读如下:
tickTime =2000:通信心跳数,Zookeeper 服务器与客户端心跳时间,单位毫秒
Zookeeper 使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime 时间就会发送一个心跳,时间单位为毫秒。它用于心跳机制,并且设置最小的 session 超时时间为两倍心跳时间。(session 的最小超时时间是 2*tickTime)
initLimit =10:LF 初始通信时限
集群中的 Follower 跟随者服务器与 Leader 领导者服务器之间初始连接时能容忍的最多心跳数(tickTime的数量),用它来限定集群中的 Zookeeper 服务器连接到 Leader 的时限。
syncLimit =5:LF 同步通信时限
集群中 Leader 与 Follower 之间的最大响应时间单位,假如响应超过 syncLimit * tickTime,Leader 认为 Follwer 死掉,从服务器列表中删除 Follwer。
dataDir:数据文件目录+数据持久化路径
主要用于保存 Zookeeper 中的数据。
clientPort =2181:客户端连接端口
监听客户端连接的端口。
server.A=B:C:D
A 是一个数字,表示这个是第几号服务器;集群模式下配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面有一个数据就是 A 的值,Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断到底是哪个server。
B 是这个服务器的 ip 地址;
C 是这个服务器与集群中的 Leader 服务器交换信息的端口;
D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
半数机制
集群中半数以上机器存活,集群可用。所以 Zookeeper 适合安装奇数台服务器。
Zookeeper 虽然在配置文件中并没有指定 Master 和 Slave。但是,Zookeeper 工作时,是有一个节点为 Leader,其他则为 Follower,Leader 是通过内部的选举机制临时产生的。
选举过程例子
假设有五台服务器组成的 Zookeeper 集群,它们的 id 从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的。假设这些服务器依序启动。
① 服务器 1 启动,此时只有它一台服务器启动了,它发出去的报文没有任何响应,所以它的选举状态一直是 LOOKING 状态。
② 服务器 2 启动,它与最开始启动的服务器 1 进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以 id 值较大的服务器 2 胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是 3),所以服务器 1、2 还是继续保持 LOOKING 状态。
③ 服务器 3 启动,根据前面的理论分析,服务器 3 成为服务器 1、2、3 中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的 Leader。
④ 服务器 4 启动,根据前面的分析,理论上服务器4应该是服务器 1、2、3、4 中最大的,但是由于前面已经有半数以上的服务器选举了服务器 3,所以它只能接收当小弟的命了。
⑤ 服务器 5 启动,同 4 一样当小弟。
持久(Persistent)
客户端和服务器端断开连接后,创建的节点不删除
短暂(Ephemeral)
客户端和服务器端断开连接后,创建的节点自己删除
节点类型
① 持久化目录节点
客户端与 Zookeeper 断开连接后,该节点依旧存在。
② 持久化顺序编号目录节点
客户端与 Zookeeper 断开连接后,该节点依旧存在,只是 Zookeeper 给该节点名称进行顺序编号
③ 临时目录节点
客户端与 Zookeeper 断开连接后,该节点被删除
④ 临时顺序编号目录节点
客户端与 Zookeeper 断开连接后,该节点被删除,只是 Zookeeper 给该节点名称进行顺序编号。
说明: 创建 znode 时设置顺序标识,znode 名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护。
注意: 在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序。
czxid: 创建节点的事务 zxid
每次修改 ZooKeeper 状态都会收到一个 zxid 形式的时间戳,也就是 ZooKeepe r事务 ID。
事务 ID 是 ZooKeeper 中所有修改总的次序。每个修改都有唯一的 zxid,若 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生。
ctime: znode 被创建的毫秒数(从 1970 年开始)
mzxid: znode 最后更新的事务 zxid
mtime: znode 最后修改的毫秒数(从 1970 年开始)
pZxid: znode 最后更新的子节点 zxid
cversion : znode 子节点变化号,znode 子节点修改次数
dataversion: znode 数据变化号
aclVersion: znode 访问控制列表的变化号
ephemeralOwner: 如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。
dataLength: znode 的数据长度
numChildren: znode 子节点数量
监听原理详解:
① 首先要有一个 main() 线程
② 在 main 线程中创建 Zokeeper 客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener) 。
③ 通过 connect 线程将注册的监听事件发送给 Zookeeper。
④ 在 Zookeeper 的注册监听器列表中将注册的监听事件添加到列表中。
⑤ Zookeeper 监听到有数据或路径变化,就会将这个消息发送给 listener 线程。
⑥ listener 线程内部调用了 process() 方法。
常见的监听
① 监听节点数据的变化
get -w path
② 监听子节点增减的变化
ls -w path
启动客户端
zkCli.sh
显示所有操作命令
help
查看当前 znode 中所包含的内容
ls /
ls2 /
查看当前节点详细数据
ls -s /
分别创建 2 个普通节点
create /animals “dog”
create /animals/small “ant”
获得节点的值
get /animals
get /animals/small
创建短暂节点
create -e /animals/big “elephant”
创建带序号的节点
create -s /animals/middle “hourse”
修改节点数据值
set /animals/small “bug”
节点的值变化监听
① 在 slave1 主机上注册监听 /animals 节点数据变化
get -w /animals
② 在 slave2 主机上修改 /animals 节点的数据
set /animals “cat”
③ 观察 slave1 主机收到子节点变化的监听
节点的子节点变化监听(路径变化)
① 在 slave1 主机上注册监听 /animals 节点的子节点变化
ls -w /animals
② 在 slave2 主机 /animals 节点上创建子节点
create /animals/mini “fly”
③ 观察 slave1 主机收到子节点变化的监听
删除节点
delete /animals/big
递归删除节点
deleteall /animals/mini
查看节点状态
stat /animals
创建一个 Maven 工程
在 pom 文件中添加依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.9</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> </dependencies>
在项目的 src/main/resources 目录下,新建一个文件,命名为 “log4j.properties”,在文件中填入:
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
@SpringBootTest
public class ZookeeperTest {
private static String connectString = "localhost:2181";
private static int sessionTimeout = 2000;
private static ZooKeeper zkClient;
@Test
public static void init() throws IOException {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent watchedEvent) {
}
});
}
}
4.3.3 创建子节点
先将上面的 init() 方法前面的注解 @Test 改为 @BeforeAll
// 创建子节点
@SpringBootTest public class ZookeeperTest { private static String connectString = "localhost:2181"; private static int sessionTimeout = 2000; private static ZooKeeper zkClient; @BeforeAll public static void init() throws IOException { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { public void process(WatchedEvent watchedEvent) { } }); } @Test public void createNode() throws Exception { // 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型 String path = zkClient.create("/demo1", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(path); } }
// 获取子节点并监听节点变化 @SpringBootTest public class WatchTest { private static String connectString = "localhost:2181"; private static int sessionTimeout = 2000; private static ZooKeeper zkClient; @Test public void getChildrenAndWatch() throws Exception { List<String> children = zkClient.getChildren("/", true); for (String child : children) { System.out.println(child); } // 延时阻塞 Thread.sleep(Long.MAX_VALUE); } @BeforeAll public static void init() throws IOException { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { public void process(WatchedEvent watchedEvent) { List<String> children = null; try { children = zkClient.getChildren("/", true); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } for (String child : children) { System.out.println(child); } } }); } }
// 判断znode是否存在
@Test
public void exist() throws Exception {
Stat stat = zkClient.exists(“/animals”, false);
System.out.println(stat == null ? “not exist” : “exist”);
}
需求
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
需求分析
代码实现
① 先在集群上创建 /servers 节点
create /servers “servers”
② 服务器端向 Zookeeper 注册代码
package zookeeper;
import org.apache.zookeeper.*;
import java.io.IOException;
public class DistributeServer {
private String connectString = "master:2181,slave1:2181,slave2:2181"; private int sessionTimeout = 2000; private ZooKeeper zkClient; public static void main(String[] args) throws Exception { args = new String[]{"slave1"}; DistributeServer server = new DistributeServer(); // 1.连接zookeeper集群 server.getConnect(); // 2.注册节点 server.register(args[0]); // 3.业务逻辑处理 server.business(); } private void getConnect() throws IOException { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { public void process(WatchedEvent event) { } }); } private void register(String hostname) throws KeeperException, InterruptedException { String path = zkClient.create("/servers/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname + " is online"); } private void business() throws InterruptedException { Thread.sleep(Long.MAX_VALUE); }
}
③ 客户端代码
package zookeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class DistributeClient {
private String connectString = "master:2181,slave1:2181,slave2:2181"; private int sessionTimeout = 2000; private ZooKeeper zkClient; public static void main(String[] args) throws Exception { DistributeClient client = new DistributeClient(); // 1.连接zookeeper集群 client.getConnect(); // 2.注册监听 client.getChildren(); // 3.业务逻辑处理 client.business(); } private void getConnect() throws IOException { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { public void process(WatchedEvent event) { try { getChildren(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } private void getChildren() throws KeeperException, InterruptedException { List<String> children = zkClient.getChildren("/servers", true); // 存储服务器节点主机名称集合 ArrayList<String> hosts = new ArrayList<String>(); for (String child : children) { byte[] data = zkClient.getData("/servers/" + child, false, null); hosts.add(new String(data)); } System.out.println(hosts); } private void business() throws InterruptedException { Thread.sleep(Long.MAX_VALUE); }
}
ZkClient 是由 Datameer 的工程师开发的开源客户端,对 Zookeeper 的原生 API 进行了包装,实现了超时重连、Watcher 反复注册等功能。
在使用 ZooKeeper 的 Java 客户端时,经常需要处理几个问题:重复注册 watcher、session失效重连、异常处理。
IZKConnection:是一个ZkClient与Zookeeper之间的一个适配器;在代码里直接使用的是ZKClient,实质上还是委托了zookeeper来处理了。
在ZKClient中,根据事件类型,分为
节点事件(数据事件),对应的事件处理器是IZKDataListener;
子节点事件,对应的事件处理器是IZKChildListener;
Session事件,对应的事件处理器是IZKStatusListener;
ZkEventThread:是专门用来处理事件的线程
目前已经运用到了很多项目中,知名的有 Dubbo、Kafka、Helix。
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.11</version>
</dependency>
从上述结构上看,IZKConnection 是一个 ZkClient 与 ZooKeeper 之间的一个适配器。在代码里直接使用的是 ZKClient,其实质还是委托了 zookeeper 来处理了。
使用 ZooKeeper 客户端来注册 watcher 有几种方法: 1、创建 ZooKeeper 对象时指定默认的 Watcher,2、getData(),3、exists(),4、 getchildren。其中 getdata,exists 注册的是某个节点的事件处理器(watcher),getchildren 注册的是子节点的事件处理器(watcher)。而在 ZKClient 中,根据事件类型,分为了节点事件(数据事件)、子节点事件。对应的事件处理器则是 IZKDataListener 和 IZKChildListener。另外加入了 Session 相关的事件和事件处理器。
ZkEventThread 是专门用来处理事件的线程。
启动 ZKClient
在创建 ZKClient 对象时,就完成了到 ZooKeeper 服务器连接的建立。具体过程是这样的:
启动时,指定好 connection string,连接超时时间,序列化工具等。
创建并启动 eventThread,用于接收事件,并调度事件监听器 Listener 的执行。
连接到 zookeeper 服务器,同时将 ZKClient 自身作为默认的 Watcher。
为节点注册Watcher:
ZooKeeper 的三个方法:getData、getChildren、exists.
ZKClient 都提供了相应的代理方法。就拿 exists 来看:
可以看到,是否注册 watcher,由 hasListeners(path)来决定的。
hasListeners 就是看有没有与该数据节点绑定的 listener。
所以,默认情况下,都会自动的为指定的 path 注册 watcher,并且是默认的 watcher (ZKClient)。怎么才能让 hasListeners 判定值为 true 呢,也就是怎么才能为 path 绑定 Listener 呢?
ZKClient提供了订阅功能:
一个新建的会话,只需要在取得响应的数据节点后,调用 subscribteXxx 就可以订阅上相应的事件了。
前面已经知道,ZKClient 是默认的 Watcher,并且在为各个数据节点注册的 Watcher 都是这个默认的 Watcher。那么该是如何将各种事件通知给相应的 Listener 呢?
处理过程大致可以概括为下面的步骤:
判断变更类型:变更类型分为 State 变更、ChildNode 变更(创建子节点、删除子节点、修改子节点数据)、NodeData 变更(创建指定 node,删除节点,节点数据变更)。
取出与 path 关联的 Listeners,并为每一个 Listener 创建一个 ZKEvent,将 ZkEvent 交给 ZkEventThread 处理。
ZkEventThread 线程,拿到 ZkEvent 后,只需要调用 ZkEvent 的 run 方法进行处理。 从这里也可以知道,具体的怎么如何调用 Listener,还要依赖于 ZkEvent 的 run()实现了。
注册监听 watcher:
接口类 | 注册监听方法 | 解除监听方法 |
---|---|---|
IZkChildListener(子节点) | ZkClient的subscribeChildChanges方法 | ZkClient 的unsubscribeChildChanges 方法 |
IZkDataListener(数据) | ZkClient 的subscribeDataChanges 方法 | ZkClient 的 unsubscribeDataChanges 方法 |
IZkStateListener(客户端状 态) | ZkClient 的 subscribeStateChanges 方 法 | ZkClient 的 unsubscribeStateChanges 方法 |
在 ZkClient 中客户端可以通过注册相关的事件监听来实现对 Zookeeper 服务端时间的订阅。
其中 ZkClient 提供的监听事件接口有以下几种:
其中 ZkClient 还提供了一个 unsubscribeAll 方法,来解除所有监听。
Zookeeper 中提供的变更操作有:节点的创建、删除,节点数据的修改:
创建操作,数据节点分为四种,ZKClient 分别为他们提供了相应的代理:
删除节点的操作:
修改节点数据的操作:
writeDataReturnStat():写数据并返回数据的状态。
updateDataSerialized():修改已序列化的数据。执行过程是:先读取数据,然后使用DataUpdater 对数据修改,最后调用 writeData 将修改后的数据发送给服务端。
ZooKeeper 中,会涉及到序列化、反序列化的操作有两种:getData、setData。在 ZKClient 中,分别用 readData、writeData 来替代了。
对于 readData:先调用 zookeeper 的 getData,然后进行使用 ZKSerializer 进行反序列化工 作。
对于 writeData:先使用 ZKSerializer 将对象序列化后,再调用 zookeeper 的 setData。
Watcher 自动重注册:这个要是依赖于 hasListeners()的判断,来决定是否再次注册。如果对此有不清晰的,可以看上面的流程处理的说明。
Session 失效重连:如果发现会话过期,就先关闭已有连接,再重新建立连接。
异常处理:对比 ZooKeeper 和 ZKClient,就可以发现 ZooKeeper 的所有操作都是抛异常 的,而 ZKClient 的所有操作,都不会抛异常的。在发生异常时,它或做日志,或返回空, 或做相应的 Listener 调用。
相比于 ZooKeeper 官方客户端,使用 ZKClient 时,只需要关注实际的 Listener 实现即可。所 以这个客户端,还是推荐大家使用的。
https://www.cnblogs.com/jinchengll/p/12333213.html
启动ZKClient:在创建ZKClient对象时,就完成了到ZooKeeper服务器连接的建立
1、启动时,制定好connection string,连接超时时间,序列化工具等
2、创建并启动eventThread,用于接收事件,并调度事件监听器Listener的执行
3、连接到Zookeeper服务器,同时将ZKClient自身作为默认的Watcher
为节点注册Watcher
Zookeeper 原始API的三个方法:getData,getChildren、exists,ZKClient都提供了相应的代理方法,比如exists,
hasListeners是看有没有与该数据节点绑定的listener
所以,默认情况下,都会自动的为指定的path注册watcher,并且是默认的watcher(ZKClient),那么怎样才能让hasListeners值为true呢,也就是怎么才能为path绑定Listener呢?
ZKClient提供了订阅功能,一个新建的会话,只需要在取得响应的数据节点后,调用subscribeXXX就可以订阅上相应的事件了。
public class ZkClientCrud<T> { ZkClient zkClient ; final static Logger logger = LoggerFactory.getLogger(ZkClientCrud.class); public ZkClientCrud(ZkSerializer zkSerializer) { logger.info("链接zk开始"); // zkClient=new ZkClient(ZookeeperUtil.connectString,ZookeeperUtil.sessionTimeout); zkClient=new ZkClient(ZookeeperUtil.connectString,ZookeeperUtil.sessionTimeout,ZookeeperUtil.sessionTimeout,zkSerializer); } public void createEphemeral(String path,Object data){ zkClient.createEphemeral(path,data); } /*** * 支持创建递归方式 * @param path * @param createParents */ public void createPersistent(String path,boolean createParents){ zkClient.createPersistent(path,createParents); } /*** * 创建节点 跟上data数据 * @param path * @param data */ public void createPersistent(String path,Object data){ zkClient.createPersistent(path,data); } /*** * 子节点 * @param path * @return */ public List<String> getChildren(String path){ return zkClient.getChildren(path); } public T readData(String path){ return zkClient.readData(path); } public void writeData(String path,Object data){ zkClient.writeData(path,data); } //递归删除 public void deleteRecursive(String path){ zkClient.deleteRecursive(path); } }
public class ZkClientCrudTest { final static Logger logger = LoggerFactory.getLogger(ZkClientCrudTest.class); public static void main(String[] args) { ZkClientCrud<User> zkClientCrud=new ZkClientCrud<User>(new SerializableSerializer()); String path="/root"; zkClientCrud.deleteRecursive(path); zkClientCrud.createPersistent(path,"hi"); /* zkClientCrud.createPersistent(path+"/a/b/c",true);//递归创建 但是不能设在value //zkClientCrud.createPersistent(path,"hi"); logger.info(zkClientCrud.readData(path)); //更新 zkClientCrud.writeData(path,"hello"); logger.info(zkClientCrud.readData(path)); logger.info(String.valueOf(zkClientCrud.getChildren(path))); //子节点 List<String> list=zkClientCrud.getChildren(path); for(String child:list){ logger.info("子节点:"+child); }*/ User user=new User(); user.setId(1); user.setName("张三"); zkClientCrud.writeData(path,user); System.out.println(zkClientCrud.readData(path).getName());; } }
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
private Integer id;
private String name;
}
Watcher
public class ZkClientWatcher { ZkClient zkClient; public ZkClientWatcher() { zkClient = new ZkClient(new ZkConnection(ZookeeperUtil.connectString), ZookeeperUtil.sessionTimeout); } public void createPersistent(String path, Object data) { zkClient.createPersistent(path, data); } public void writeData(String path, Object object) { zkClient.writeData(path, object); } public void delete(String path) { zkClient.delete(path); } public boolean exists(String path) { return zkClient.exists(path); } public void deleteRecursive(String path) { zkClient.deleteRecursive(path); } //对父节点添加监听数据变化。 public void subscribe(String path) { zkClient.subscribeDataChanges(path, new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { System.out.printf("变更的节点为:%s,数据:%s\r\n", dataPath, data); } @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.printf("删除的节点为:%s\r\n", dataPath); } }); } //对父节点添加监听子节点变化。 public void subscribe2(String path) { zkClient.subscribeChildChanges(path, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { System.out.println("父节点: " + parentPath + ",子节点:" + currentChilds + "\r\n"); } }); } //客户端状态 public void subscribe3(String path) { zkClient.subscribeStateChanges(new IZkStateListener() { @Override public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception { if (state == Watcher.Event.KeeperState.SyncConnected) { //当我重新启动后start,监听触发 System.out.println("连接成功"); } else if (state == Watcher.Event.KeeperState.Disconnected) { System.out.println("连接断开");//当我在服务端将zk服务stop时,监听触发 } else System.out.println("其他状态" + state); } @Override public void handleNewSession() throws Exception { System.out.println("重建session"); } @Override public void handleSessionEstablishmentError(Throwable error) throws Exception { } }); } /* @Override public void handleDataChange(String dataPath, Object data) throws Exception { } @Override public void handleDataDeleted(String dataPath) throws Exception { }*/ }
public class ZkClientWatcherTest { public static void main(String[] args) throws InterruptedException { ZkClientWatcher zkClientWatche=new ZkClientWatcher(); String path="/root"; zkClientWatche.deleteRecursive(path); zkClientWatche.createPersistent(path,"hello"); zkClientWatche.subscribe(path); zkClientWatche.subscribe2(path); // zkClientWatche.subscribe3(path);//需要启服务 // Thread.sleep(Integer.MAX_VALUE); zkClientWatche.createPersistent(path+"/root2","word"); TimeUnit.SECONDS.sleep(1); zkClientWatche.writeData(path,"hi"); TimeUnit.SECONDS.sleep(1); //zkClientWatche.delete(path);//如果目录下有内容 不能删除 会报 Directory not empty for /root的异常 zkClientWatche.deleteRecursive(path); TimeUnit.SECONDS.sleep(1); //这个main线程就结束 } }
public class ZookeeperUtil {
/** zookeeper服务器地址 */
// public static final String connectString = "192.168.0.101:2181,192.168.0.102:2181,192.168.0.104:2181";
public static final String connectString = "localhost:2181";
/** 定义session失效时间 */
public static final int sessionTimeout = 5000;
public static final String path = "/root";
}
zookeeper不是为高可用性设计的,但它使用ZAB协议达到了极高的一致性。所以它经常被选作注册中心、配置中心、分布式锁等场景。
它的性能是非常有限的,而且API并不是那么好用。xjjdog倾向于使用基于Raft协议的Etcd或者Consul,它们更加轻量级一些。
Curator是netflix公司开源的一套zookeeper客户端,目前是Apache的顶级项目。与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。Curator解决了很多zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册wathcer和NodeExistsException 异常等。
Zookeeper 原生API问题:
1.超时重连,不支持自动,需要手动操作
2.Watch注册一次后会失效
3.不支持递归创建节点
Zookeeper API 升级版 Curator:
1.解决watcher的注册一次就失效
2.提供更多解决方案并且实现简单
3.提供常用的ZooKeeper工具类
4.编程风格更爽,点点点就可以了
5.可以递归创建节点等
Curator由一系列的模块构成,对于一般开发者而言,常用的是curator-framework和curator-recipes。
Curator2.x.x版本兼容Zookeeper的3.4.x和3.5.x。
Curator3.x.x只兼容Zookeeper 3.5.x,并且提供了一些诸如动态重新配置、watch删除等新特性。
Curator4 统一对 ZooKeeper 3.4.x 和 3.5.x 的支持
Curator-Framework是ZooKeeper Client更高的抽象API,最佳核心的功能就是自动连接管理:
当ZooKeeper客户端内部出现异常, 将自动进行重连或重试, 该过程对外几乎完全透明
监控节点数据变化事件NodeDataChanged,需要时调用updateServerList()方法
Curator recipes自动移除监控
更加清晰的API
简化了ZooKeeper原生的方法, 事件等, 提供流式fluent的接口,提供Recipes实现 : 选举,共享锁, 路径cache, 分布式队列,分布式优先队列等。
curator-recipes:封装了一些高级特性,如:Cache事件监听、 Elections选举、分布式锁、分布式计数器、分布式Barrier、Queues队列等
1.使用curator建立与zk的连接
2.使用curator添加/递归添加节点
3.使用curator删除/递归删除节点
4.使用curator创建/验证 ACL(访问权限列表)
5.使用curator监听 单个/父 节点的变化(watch事件)
6.基于curator实现Zookeeper分布式锁(需要掌握基本的多线程知识)
7.基于curator实现分布式计数器
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <!--建议和本地安装版本保持一致--> <version>3.7.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>5.2.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.2.0</version> </dependency>
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ZkConnectCuratorUtil { final static Logger log = LoggerFactory.getLogger(ZkConnectCuratorUtil.class); public CuratorFramework zkClient = null; //zk的客户端工具Curator(在本类通过new实例化的是,自动start) private static final int MAX_RETRY_TIMES = 3; //定义失败重试次数 private static final int BASE_SLEEP_TIME_MS = 5000; //连接失败后,再次重试的间隔时间 单位:毫秒 private static final int SESSION_TIME_OUT = 1000000; //会话存活时间,根据业务灵活指定 单位:毫秒 private static final String ZK_SERVER_IP_PORT = "localhost:2181";//Zookeeper服务所在的IP和客户端端口 private static final String NAMESPACE = "workspace";//指定后,默认操作的所有的节点都会在该工作空间下进行 //本类通过new ZkCuratorUtil()时,自动连通zkClient public ZkConnectCuratorUtil() { RetryPolicy retryPolicy = new RetryNTimes(MAX_RETRY_TIMES, BASE_SLEEP_TIME_MS);//首次连接失败后,重试策略 zkClient = CuratorFrameworkFactory.builder() //.authorization("digest", "root:root".getBytes())//登录超级管理(需单独配) .connectString(ZK_SERVER_IP_PORT) .sessionTimeoutMs(SESSION_TIME_OUT) .retryPolicy(retryPolicy) .namespace(NAMESPACE).build(); zkClient.start(); } public void closeZKClient() { if (zkClient != null) { this.zkClient.close(); } } public static void main(String[] args) { ZkConnectCuratorUtil zkUtil=new ZkConnectCuratorUtil(); boolean ifStarted=zkUtil.zkClient.isStarted(); System.out.println("当前客户的状态:" + (ifStarted ? "连接中" : "已关闭")); zkUtil.closeZKClient(); boolean ifClose = zkUtil.zkClient.isStarted(); System.out.println("当前客户的状态:" + (ifClose ? "连接成功" : "已关闭")); } }
public class CuratorDao { //使用curator(递归)添加节点 //级联创建节点(原生API不支持/后台客户端也不支持,但是Curator支持) public static void createNodes(CuratorFramework zkClient, String nodePath, String nodeData) throws Exception { zkClient.create() .creatingParentContainersIfNeeded()//创建父节点,如果需要的话 .withMode(CreateMode.PERSISTENT) //指定节点是临时的,还是永久的 .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //指定节点的操作权限 .forPath(nodePath, nodeData.getBytes()); System.out.println(nodePath + "节点已成功创建…"); } //使用curator(递归)删除节点 //删除node节点及其子节点 public static void deleteNodeWithChild(CuratorFramework zkClient, String nodePath) throws Exception { zkClient.delete() .guaranteed() //保证删除:如果删除失败,那么在后端还是继续会删除,直到成功 .deletingChildrenIfNeeded() //级联删除子节点 //.withVersion(1)//版本号可以据需使用 .forPath(nodePath); System.out.println(nodePath + "节点已删除成功…"); } //使用curator更新节点数据 //更新节点data数据 public static void updateNodeData(CuratorFramework zkClient, String nodePath, String nodeNewData) throws Exception { zkClient.setData().withVersion(0).forPath(nodePath, nodeNewData.getBytes());//版本号据需使用,默认可以不带 System.out.println(nodePath + "节点数据已修改成功…"); } //使用curator查询节点数据 //查询node节点数据 public static void getNodeData(CuratorFramework zkClient, String nodePath) throws Exception { Stat stat = new Stat(); byte[] data = zkClient.getData().storingStatIn(stat).forPath(nodePath); System.out.println("节点" + nodePath + "的数据为" + new String(data)); System.out.println("节点的版本号为:" + stat.getVersion()); } //使用curator查询节点的子节点 //打印node子节点 public static void printChildNodes(CuratorFramework zkClient, String parentNodePath) throws Exception { List<String> childNodes = zkClient.getChildren().forPath(parentNodePath); System.out.println("开始打印子节点"); for (String str : childNodes) { System.out.println(str); } } //使用curator判断节点是否存在 //判断node节点是否存在 public static void checkNodeExists(CuratorFramework zkClient, String nodePath) throws Exception { Stat stat = zkClient.checkExists().forPath(nodePath); System.out.println(null == stat ? "节点不存在" : "节点存在"); } /**************使用Curator高级API特性之Cache缓存监控节点变化*************/ @Test public void test() throws Exception { ZkConnectCuratorUtil zkUtil = new ZkConnectCuratorUtil(); CuratorFramework zkClient = zkUtil.zkClient; // CuratorDao.createNodes(zkClient,"/xiaosi/test","siguogui"); // CuratorDao.deleteNodeWithChild(zkClient,"/xiaosi/test"); // CuratorDao.updateNodeData(zkClient,"/xiaosi/test","xiaosi"); // CuratorDao.getNodeData(zkClient,"/xiaosi/test"); // CuratorDao.printChildNodes(zkClient, "/xiaosi"); CuratorDao.checkNodeExists(zkClient, "/xiaosi"); } }
cache是一种缓存机制,可以借助cache实现监听。
简单来说,cache在客户端缓存了znode的各种状态,当感知到zk集群的znode状态变化,会触发event事件,注册的监听器会处理这些事件。
curator支持的cache种类有4种Path Cache,Node Cache,Tree Cache,Curator Cache
1)Path Cache
Path Cache用来观察ZNode的子节点并缓存状态,如果ZNode的子节点被创建,更新或者删除,那么Path Cache会更新缓存,并且触发事件给注册的监听器。
它是通过PathChildrenCache类来实现的,监听器注册是通过PathChildrenCacheListener。
2)Node Cache
Node Cache用来观察ZNode自身,如果ZNode节点本身被创建,更新或者删除,那么Node Cache会更新缓存,并触发事件给注册的监听器。
它是通过NodeCache类来实现的,监听器对应的接口为NodeCacheListener。
3)Tree Cache
Tree Cache是上两种的合体,Tree Cache观察的是自身+所有子节点的所有数据,并缓存所有节点数据。
它是通过TreeCache类来实现的,监听器对应的接口为TreeCacheListener。
4)Curator Cache ( requires ZooKeeper 3.6+)
Curator Cache,是在zk3.6新版本添加的特性,该版本的出现是为了逐步淘汰上面3监听。
它是通过CuratorCache类来实现的,监听器对应的接口为CuratorCacheListener。
Curator一次性的watch
import org.apache.curator.framework.api.CuratorWatcher; import org.apache.zookeeper.WatchedEvent; public class MyCuratorWatcher implements CuratorWatcher { @Override public void process(WatchedEvent event) throws Exception { System.out.println("触发watcher,节点路径为:" + event.getPath()); switch (event.getType()) { case NodeCreated: break; default: break; } } } //一次性的watch public static void watchOnce(CuratorFramework zkClient,String nodePath) throws Exception { zkClient.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath); }
NodeCache监听当前节点变化,通过NodeCacheListener接口持续监听节点的变化来实现
//持续监听的watch public static void watchForeverByNodeCache(CuratorFramework zkClient,String nodePath) throws Exception { final NodeCache nodeCache=new NodeCache(zkClient, nodePath);//把监听节点,转换为nodeCache nodeCache.start(false);//默认为false 设置为true时,会自动把节点数据存放到nodeCache中;设置为false时,初始化数据为空 ChildData cacheData=nodeCache.getCurrentData(); if(null==cacheData) { System.out.println("NodeCache节点的初始化数据为空……"); }else { System.out.println("NodeCache节点的初始化数据为"+new String(cacheData.getData())); } //设置循环监听 nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { ChildData cdata=nodeCache.getCurrentData(); if(null==cdata) { System.out.println("节点发生了变化,可能刚刚被删除!"); nodeCache.close();//关闭监听 }else { String data=new String(cdata.getData()); String path=nodeCache.getCurrentData().getPath(); System.out.println("节点路径"+path+"数据发生了变化,最新数据为:"+data); } } }); }
PathChildrenCache只监听子节点变化
通过PathChildrenCacheListener接口持续监听子节点来实现
//持续监听watch子节点的任何变化 public static void watchForeverByPathChildrenCache(CuratorFramework zkClient,String nodePath) throws Exception { final PathChildrenCache childrenCache=new PathChildrenCache(zkClient, nodePath,true);//把监听节点,转换为childrenCache /** * StartMode:初始化方式 * POST_INITIALIZED_EVENT: 异步初始化,初始化之后会触发事件(会进入下面的第一个case) * NORMAL:异步初始化 (不会进入下面的第一个case) * BUILD_INITIAL_CACHE: 同步初始化(把节点数据同步缓存到Cache中) */ childrenCache.start(StartMode.NORMAL); List<ChildData> childDataList=childrenCache.getCurrentData(); System.out.println("当前节点所有子节点的数据列表如下:"); for (ChildData childData : childDataList) { System.out.println(new String(childData.getData())); } childrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case INITIALIZED: System.out.println("子节点初始化OK…"); break; case CHILD_ADDED: System.out.println("子节点"+event.getData().getPath()+"已被成功添加,数据data="+new String(event.getData().getData())); break; case CHILD_UPDATED: System.out.println("子节点"+event.getData().getPath()+"数据发生变化,新数据data="+new String(event.getData().getData())); break; case CHILD_REMOVED: System.out.println("子节点"+event.getData().getPath()+"已被移除~"); break; case CONNECTION_RECONNECTED: System.out.println("正在尝试重新建立连接…"); break; case CONNECTION_SUSPENDED: System.out.println("连接状态被暂时停止…"); break; default: break; } } }); }
TreeCache是上两者的合体,既监听自身,也监听所有子节点变化
通过TreeCacheListener接口来实现
public static void treeCache(CuratorFramework zkClient) throws Exception { final String path = "/treeChildrenCache"; final TreeCache treeCache = new TreeCache(zkClient, path); treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { switch (event.getType()){ case NODE_ADDED: System.out.println("节点变动触发:NODE_ADDED:" + event.getData().getPath()); break; case NODE_REMOVED: System.out.println("节点变动触发:NODE_REMOVED:" + event.getData().getPath()); break; case NODE_UPDATED: System.out.println("节点变动触发:NODE_UPDATED:" + event.getData().getPath()); break; case CONNECTION_LOST: System.out.println("节点变动触发:CONNECTION_LOST:" + event.getData().getPath()); break; case CONNECTION_RECONNECTED: System.out.println("节点变动触发:CONNECTION_RECONNECTED:" + event.getData().getPath()); break; case CONNECTION_SUSPENDED: System.out.println("节点变动触发:CONNECTION_SUSPENDED:" + event.getData().getPath()); break; case INITIALIZED: System.out.println("节点变动触发:INITIALIZED:" + event.getData().getPath()); break; default: break; } } }); //据需可以继续做一些其他的增删改操作 zkClient.create().withMode(CreateMode.PERSISTENT).forPath(path); Thread.sleep(1000); zkClient.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1"); Thread.sleep(1000); zkClient.delete().forPath(path + "/c1"); Thread.sleep(1000); zkClient.delete().forPath(path); Thread.sleep(1000); zkClient.close(); }
Curator Cache,是在zk3.6新版本添加的特性,Curator需5.+
它的出现是为了替换以上3个监听(NodeCache、PathCache、TreeCache),它通过CuratorCacheListener.builder().for**来选择对应的监听。最后再通过curatorCache.listenable().addListener(listener);注册监听。
public static void curatorCache1(CuratorFramework zkClient) { final String path = "/curatorCache"; CuratorCache curatorCache = CuratorCache.build(zkClient, path); curatorCache.listenable().addListener(new CuratorCacheListener() { @Override public void event(Type type, ChildData oldData, ChildData newdata) { switch (type) { case NODE_CREATED: //各种判断 break; default: break; } } }); } public static void curatorCache2(CuratorFramework zkClient) throws InterruptedException { final String path = "/curatorCache"; CuratorCache curatorCache = CuratorCache.builder(zkClient,path).build(); //构建监听器 //新旧对照: //1.node cache--> CuratorCacheListener.builder().forNodeCache(new NodeCacheListener(){} ); //2.path cache--> CuratorCacheListener.builder().forPathChildrenCache(); //3.tree cache--> CuratorCacheListener.builder().forTreeCache.forTreeCache(); CuratorCacheListener listener = CuratorCacheListener.builder() .forNodeCache(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("节点改变了..."); } }) .build(); //添加监听 curatorCache.listenable().addListener(listener); //开启监听 curatorCache.start(); //让线程休眠30s(为了方便测试) Thread.sleep(1000 * 30); }
package org.example.zookeeper.curator; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.*; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.Stat; import org.testng.annotations.Test; import java.util.List; /** * @ClassName: CuratorDao * @Description: * @Author: 88578 * @Date: 2022/5/1 14:17 */ public class CuratorDao { //使用curator(递归)添加节点 //级联创建节点(原生API不支持/后台客户端也不支持,但是Curator支持) public static void createNodes(CuratorFramework zkClient, String nodePath, String nodeData) throws Exception { zkClient.create() .creatingParentContainersIfNeeded()//创建父节点,如果需要的话 .withMode(CreateMode.PERSISTENT) //指定节点是临时的,还是永久的 .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //指定节点的操作权限 .forPath(nodePath, nodeData.getBytes()); System.out.println(nodePath + "节点已成功创建…"); } //使用curator(递归)删除节点 //删除node节点及其子节点 public static void deleteNodeWithChild(CuratorFramework zkClient, String nodePath) throws Exception { zkClient.delete() .guaranteed() //保证删除:如果删除失败,那么在后端还是继续会删除,直到成功 .deletingChildrenIfNeeded() //级联删除子节点 //.withVersion(1)//版本号可以据需使用 .forPath(nodePath); System.out.println(nodePath + "节点已删除成功…"); } //使用curator更新节点数据 //更新节点data数据 public static void updateNodeData(CuratorFramework zkClient, String nodePath, String nodeNewData) throws Exception { zkClient.setData().withVersion(0).forPath(nodePath, nodeNewData.getBytes());//版本号据需使用,默认可以不带 System.out.println(nodePath + "节点数据已修改成功…"); } //使用curator查询节点数据 //查询node节点数据 public static void getNodeData(CuratorFramework zkClient, String nodePath) throws Exception { Stat stat = new Stat(); byte[] data = zkClient.getData().storingStatIn(stat).forPath(nodePath); System.out.println("节点" + nodePath + "的数据为" + new String(data)); System.out.println("节点的版本号为:" + stat.getVersion()); } //使用curator查询节点的子节点 //打印node子节点 public static void printChildNodes(CuratorFramework zkClient, String parentNodePath) throws Exception { List<String> childNodes = zkClient.getChildren().forPath(parentNodePath); System.out.println("开始打印子节点"); for (String str : childNodes) { System.out.println(str); } } //使用curator判断节点是否存在 //判断node节点是否存在 public static void checkNodeExists(CuratorFramework zkClient, String nodePath) throws Exception { Stat stat = zkClient.checkExists().forPath(nodePath); System.out.println(null == stat ? "节点不存在" : "节点存在"); } /**************使用Curator高级API特性之Cache缓存监控节点变化*************/ //一次性的watch public static void watchOnce(CuratorFramework zkClient, String nodePath) throws Exception { zkClient.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath); } //NodeCache监听当前节点变化 //通过NodeCacheListener接口持续监听节点的变化来实现 //持续监听的watch public static void watchForeverByNodeCache(CuratorFramework zkClient, String nodePath) throws Exception { final NodeCache nodeCache = new NodeCache(zkClient, nodePath);//把监听节点,转换为nodeCache nodeCache.start(false);//默认为false 设置为true时,会自动把节点数据存放到nodeCache中;设置为false时,初始化数据为空 ChildData cacheData = nodeCache.getCurrentData(); if (null == cacheData) { System.out.println("NodeCache节点的初始化数据为空……"); } else { System.out.println("NodeCache节点的初始化数据为" + new String(cacheData.getData())); } //设置循环监听 nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { ChildData cdata = nodeCache.getCurrentData(); if (null == cdata) { System.out.println("节点发生了变化,可能刚刚被删除!"); nodeCache.close();//关闭监听 } else { String data = new String(cdata.getData()); String path = nodeCache.getCurrentData().getPath(); System.out.println("节点路径" + path + "数据发生了变化,最新数据为:" + data); } } }); } //PathChildrenCache只监听子节点变化 //通过PathChildrenCacheListener接口持续监听子节点来实现 //持续监听watch子节点的任何变化 public static void watchForeverByPathChildrenCache(CuratorFramework zkClient, String nodePath) throws Exception { final PathChildrenCache childrenCache = new PathChildrenCache(zkClient, nodePath, true);//把监听节点,转换为childrenCache /** * StartMode:初始化方式 * POST_INITIALIZED_EVENT: 异步初始化,初始化之后会触发事件(会进入下面的第一个case) * NORMAL:异步初始化 (不会进入下面的第一个case) * BUILD_INITIAL_CACHE: 同步初始化(把节点数据同步缓存到Cache中) */ childrenCache.start(PathChildrenCache.StartMode.NORMAL); List<ChildData> childDataList = childrenCache.getCurrentData(); System.out.println("当前节点所有子节点的数据列表如下:"); for (ChildData childData : childDataList) { System.out.println(new String(childData.getData())); } childrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case INITIALIZED: System.out.println("子节点初始化OK…"); break; case CHILD_ADDED: System.out.println("子节点" + event.getData().getPath() + "已被成功添加,数据data=" + new String(event.getData().getData())); break; case CHILD_UPDATED: System.out.println("子节点" + event.getData().getPath() + "数据发生变化,新数据data=" + new String(event.getData().getData())); break; case CHILD_REMOVED: System.out.println("子节点" + event.getData().getPath() + "已被移除~"); break; case CONNECTION_RECONNECTED: System.out.println("正在尝试重新建立连接…"); break; case CONNECTION_SUSPENDED: System.out.println("连接状态被暂时停止…"); break; default: break; } } }); } //TreeCache是上两者的合体,既监听自身,也监听所有子节点变化 //通过TreeCacheListener接口来实现 public static void treeCache(CuratorFramework zkClient, String nodePath) throws Exception { // final String path = "/treeChildrenCache"; final TreeCache treeCache = new TreeCache(zkClient, nodePath); treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { switch (event.getType()) { case NODE_ADDED: System.out.println("节点变动触发:NODE_ADDED:" + event.getData().getPath()); break; case NODE_REMOVED: System.out.println("节点变动触发:NODE_REMOVED:" + event.getData().getPath()); break; case NODE_UPDATED: System.out.println("节点变动触发:NODE_UPDATED:" + event.getData().getPath()); break; case CONNECTION_LOST: System.out.println("节点变动触发:CONNECTION_LOST:" + event.getData().getPath()); break; case CONNECTION_RECONNECTED: System.out.println("节点变动触发:CONNECTION_RECONNECTED:" + event.getData().getPath()); break; case CONNECTION_SUSPENDED: System.out.println("节点变动触发:CONNECTION_SUSPENDED:" + event.getData().getPath()); break; case INITIALIZED: System.out.println("节点变动触发:INITIALIZED:" + event.getData().getPath()); break; default: break; } } }); //据需可以继续做一些其他的增删改操作 zkClient.create().withMode(CreateMode.PERSISTENT).forPath(nodePath); Thread.sleep(1000); zkClient.create().withMode(CreateMode.PERSISTENT).forPath(nodePath + "/c1"); Thread.sleep(1000); zkClient.delete().forPath(nodePath + "/c1"); Thread.sleep(1000); zkClient.delete().forPath(nodePath); Thread.sleep(1000); zkClient.close(); } /*Curator Cache,是在zk3.6新版本添加的特性,Curator需5.*+ 它的出现是为了替换以上3个监听(NodeCache、PathCache、TreeCache), 它通过CuratorCacheListener.builder().for***来选择对应的监听。 最后再通过curatorCache.listenable().addListener(listener);注册监听。*/ public static void curatorCache1(CuratorFramework zkClient) { final String path = "/curatorCache"; CuratorCache curatorCache = CuratorCache.build(zkClient, path); curatorCache.listenable().addListener(new CuratorCacheListener() { @Override public void event(Type type, ChildData oldData, ChildData newdata) { switch (type) { case NODE_CREATED: //各种判断 break; default: break; } } }); } public static void curatorCache2(CuratorFramework zkClient) throws InterruptedException { final String path = "/curatorCache"; CuratorCache curatorCache = CuratorCache.builder(zkClient, path).build(); //构建监听器 //新旧对照: //1.node cache--> CuratorCacheListener.builder().forNodeCache(new NodeCacheListener(){} ); //2.path cache--> CuratorCacheListener.builder().forPathChildrenCache(); //3.tree cache--> CuratorCacheListener.builder().forTreeCache.forTreeCache(); CuratorCacheListener listener = CuratorCacheListener.builder() .forNodeCache(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("节点改变了..."); } }) .build(); //添加监听 curatorCache.listenable().addListener(listener); //开启监听 curatorCache.start(); //让线程休眠30s(为了方便测试) Thread.sleep(1000 * 30); } @Test public void test() throws Exception { ZkConnectCuratorUtil zkUtil = new ZkConnectCuratorUtil();//new的同时,zk也被启动 CuratorFramework zkClient = zkUtil.zkClient; // CuratorDao.createNodes(zkClient,"/xiaosi/test","siguogui"); // CuratorDao.deleteNodeWithChild(zkClient,"/xiaosi/test"); // CuratorDao.updateNodeData(zkClient,"/xiaosi/test","xiaosi"); // CuratorDao.getNodeData(zkClient,"/xiaosi/test"); // CuratorDao.printChildNodes(zkClient, "/xiaosi"); CuratorDao.checkNodeExists(zkClient, "/xiaosi"); } public static void main(String[] args) throws Exception { ZkConnectCuratorUtil zkUtil = new ZkConnectCuratorUtil();//new的同时,zk也被启动 CuratorFramework zkClient = zkUtil.zkClient; // CuratorDao.watchOnce(zkClient, "/xiaosi/test"); // CuratorDao.watchForeverByNodeCache(zkClient, "/xiaosi/test"); // CuratorDao.watchForeverByPathChildrenCache(zkClient, "/xiaosi/test"); CuratorDao.treeCache(zkClient, "/xiaosi/test4"); CuratorDao dao = new CuratorDao(); String nodePath = "/super/succ"; dao.createNodes(zkClient, nodePath, "super");//创建节点 // dao.updateNodeData(zkClient, nodePath, "hello");//更新节点数据 // dao.deleteNodeWithChild(zkClient, nodePath); // dao.getNodeData(zkClient, nodePath); // dao.printChildNodes(zkClient, nodePath); // dao.checkNodeExists(zkClient, nodePath); // dao.watchOnce(zkClient, nodePath); // dao.watchForeverByNodeCache(zkClient, nodePath); // dao.watchForeverByPathChildrenCache(zkClient, nodePath); Thread.sleep(300000); //延迟sleep时间,便于后才修改节点,看前台是否会继续触发watch cto.closeZKClient(); } }
//本类代码,只涉及ACL操作 public class CuratorAcl { public CuratorFramework client = null; public static final String workspace="workspace"; public static final String zkServerPath = "192.168.31.216:2181"; public CuratorAcl() { RetryPolicy retryPolicy = new RetryNTimes(3, 5000); client = CuratorFrameworkFactory.builder().authorization("digest", "mayun:mayun".getBytes())//通常情况下,登录账号、密码可以通过构造参数传入,暂时固定,据需修改 .connectString(zkServerPath) .sessionTimeoutMs(20000).retryPolicy(retryPolicy) .namespace(workspace).build(); client.start(); } public void closeZKClient() { if (client != null) { this.client.close(); } } }
//把明文的账号密码转换为加密后的密文 public class AclUtils { public static String getDigestUserPwd(String loginId_Username_Passwd) { String digest = ""; try { digest = DigestAuthenticationProvider.generateDigest(loginId_Username_Passwd); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } return digest; } public static void main(String[] args) throws IOException, InterruptedException, KeeperException, Exception { String id = "mayun:mayun"; String idDigested = getDigestUserPwd(id); System.out.println(idDigested); // mayun:KThXmEntEPZyHsQk7tbP5ZzEevk= } }
public static List<ACL> getAcls() throws NoSuchAlgorithmException{
List<ACL> acls=new ArrayList<ACL>();
Id mayun =new Id("digest", AclUtils.getDigestUserPwd("mayun:mayun"));
Id lilei =new Id("digest", AclUtils.getDigestUserPwd("lilei:lilei"));
acls.add(new ACL(Perms.ALL, mayun));//给mayun一次性赋值所有权限
acls.add(new ACL(Perms.READ, lilei));
acls.add(new ACL(Perms.DELETE | Perms.CREATE, lilei));//给lilei分两次赋权限(目的:看不同的赋权方式)
return acls;
}
public static void createNodesCascade(CuratorAcl cto,String nodePath,String nodeData,List<ACL> acls) throws Exception {
String result=cto.client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(acls, true)//给节点赋权限
.forPath(nodePath, nodeData.getBytes());
System.out.println("创建成功,result="+result);
}
public void getNodeData(CuratorAcl cto,String nodePath) throws Exception {
Stat stat = new Stat();
byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);
if(null!=stat) {
System.out.println("节点" + nodePath + "的数据为: " + new String(data));
System.out.println("该节点的版本号为: " + stat.getVersion());
}
}
public void modNodeDataWhichWithAcl(CuratorAcl cto,String nodePath,String nodeNewData) throws Exception {
cto.getNodeData(cto, nodePath);
System.out.println("节点修改后的数据为:"+nodeNewData);
cto.client.setData().forPath(nodePath, nodeNewData.getBytes());
System.out.println("修改成功");
}
public void checkNodeExists(CuratorAcl cto,String nodePath) throws Exception {
cto.getNodeData(cto, nodePath);
System.out.println("-----------=================-------------");
//判断节点是否存在,方法一(路径前面会自动添加workspace)
Stat stat=cto.client.checkExists().forPath(nodePath);
System.out.println("======="+stat==null?"不存在":"存在");
//判断节点是否存在,方法二(路径前面需手动添加workspace)
Stat stat2 = cto.client.getZookeeperClient().getZooKeeper().exists("/"+workspace+nodePath, false);
System.out.println("======="+stat2==null?"不存在":"存在");
}
ACL权限的main方法测试
通过java代码给某个节点添加ACL权限后,后台登陆zk客户端时,是无法直接操作该节点被ACL控制的权限的操作的,要想操作具有ACL权限的节点,方法只有两个。
1、知道该节点输入用户都有哪些,用这些用户的账号密码登录
2、使用超级用户登录
#getAcl /succ/testDigest 查看都有哪些用户对该节点有操作权限
#addauth digest succ:succ 登录
public static void main(String[] args) throws Exception { CuratorAcl cto = new CuratorAcl(); boolean isZkCuratorStarted = cto.client.isStarted(); System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接成功" : "已关闭")); String nodePath1 = "/acl/tom/bin"; String nodePath2 = "/acl/father/child/sub"; // cto.createNodesCascade(cto, nodePath1, "aclTest", getAcls());//首次创建,报错,只能创建父节点,子节点无法创建 // cto.client.setACL().withACL(getAcls()).forPath("/curatorNode");//给节点创建权限 // cto.getNodeData(cto, "/super"); // cto.getNodeData(cto, "/acl"); cto.checkNodeExists(cto, nodePath2); cto.closeZKClient(); boolean isZkCuratorStarted2 = cto.client.isStarted(); System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接成功" : "已关闭")); }
Curator的5种分布式锁及其对应的核心类:
1.重入式排它锁 Shared Reentrant Lock,实现类:InterProcessMutex
2.不可重入排它锁 Shared Lock ,实现类:InterProcessSemaphoreMutex
3.可重入读写锁 Shared Reentrant Read Write Lock,实现类: InterProcessReadWriteLock 、InterProcessLock
4.多锁对象容器(多共享锁) Multi Shared Lock,将多个锁作为单个实体管理的容器,实现类:InterProcessMultiLock、InterProcessLock
5.共享信号锁Shared Semaphore ,实现类:InterProcessSemaphoreV2
跨 JVM 工作的计数信号量。使用相同锁路径的所有 JVM 中的所有进程将实现进程间有限的租用集。此外,这个信号量大多是“公平的”——每个用户将按照请求的顺序获得租用(从 ZK 的角度来看)。
有两种模式可用于确定信号量的最大租用。在第一种模式中,最大租用是由给定路径的用户维护的约定。在第二种模式中,SharedCountReader 用作给定路径的信号量的方法,以确定最大租用。
public InterProcessMutex(CuratorFramework client, String path)
获取/释放锁的API
public void acquire() throws Exception;//获取锁,获取不到锁一直阻塞,zk连接中断则抛异常
public boolean acquire(long time, TimeUnit unit) throws Exception;//获取锁,超过该时间后,直接返回false,zk连接中断则抛异常
public void release() throws Exception;//释放锁
通过release()方法释放锁。InterProcessMutex 实例可以重用。Revoking ZooKeeper recipes wiki定义了可协商的撤销机制。为了撤销mutex, 调用下面的方法
/**
public InterProcessSemaphoreMutex(CuratorFramework client, String path)
使用InterProcessSemaphoreMutex,调用方法类似,区别在于该锁是不可重入的,在同一个线程中不可重入
一个读写锁管理一对相关的锁。一个负责读操作,另外一个负责写操作。读操作在写锁没被使用时可同时由多个进程使用,而写锁使用时不允许读 (阻塞)。此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。这也意味着写锁可以降级成读锁, 比如请求写锁 —>读锁 —->释放写锁。从读锁升级成写锁是不成的。
Multi Shared Lock是一个锁的容器。当调用acquire, 所有的锁都会被acquire(上锁),如果请求失败,所有的锁都会被release (释放锁)。同样调用release时所有的锁都被release(失败被忽略)。基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。主要涉及两个类:InterProcessMultiLock、InterProcessLock
它的构造函数需要包含的锁的集合,或者一组ZooKeeper的path。
public InterProcessMultiLock(List locks)
public InterProcessMultiLock(CuratorFramework client, List paths)
public class ZkLock { final static Logger log = LoggerFactory.getLogger(ZkLock.class); public CuratorFramework zkClient = null; // zk的客户端工具Curator(在本类通过new实例化的是,自动start) private static final int BASE_SLEEP_TIME_MS = 1000; // 连接失败后,再次重试的间隔时间 单位:毫秒 private static final int MAX_RETRY_TIMES = 10; // 定义失败重试次数 private static final int SESSION_TIME_OUT = 1000000; // 会话存活时间,根据业务灵活指定 单位:毫秒 private static final String ZK_SERVER_IP_PORT = "localhost:2181";// Zookeeper服务所在的IP和客户端端口 private static final String NAMESPACE = "workspace";// 指定后,默认操作的所有的节点都会在该工作空间下进行 static int j = 10; //初始化zk客户端 public ZkLock() { // 重试策略:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRY_TIMES); // 通过工厂建立连接 zkClient = CuratorFrameworkFactory.builder().connectString(ZK_SERVER_IP_PORT) // 连接地址 .sessionTimeoutMs(SESSION_TIME_OUT).retryPolicy(retryPolicy)// 重试策略 .build(); zkClient.start(); } public static void lockTest(CuratorFramework zkClient) throws InterruptedException { // 使用分布式锁,所有系统同时监听同一个节点,达到分布式锁的目的 final InterProcessMutex lock = new InterProcessMutex(zkClient, "/test"); final CountDownLatch countDownLatch = new CountDownLatch(1); for (int i = 0; i < 10; i++) {//启动10个线程 new Thread(new Runnable() { @Override public void run() { try { countDownLatch.await();// 线程等待一起执行 lock.acquire();// 分布式锁,数据同步 // 处理业务 j--; System.out.println(j); } catch (Exception e) { e.printStackTrace(); } finally { try {// 释放锁 lock.release(); } catch (Exception e) { e.printStackTrace(); } } } }, "t" + i).start(); } Thread.sleep(1000); countDownLatch.countDown();// 模拟十个线程一起并发.指定一起执行 } public static void main(String[] args) throws InterruptedException { ZkLock zkl = new ZkLock(); ZkLock.lockTest(zkl.zkClient); } }
利用Zookeeper可以实现一个集群共享的计数器。只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的。Curator有两个计数器:DistributedAtomicInteger,DistributedAtomicLong。这个两个除了计数范围(int、long)不同外,没有任何不同。操作也非常简单,跟AtomicInteger大同小异。
increment() //加1
decrement() //减1
compareAndSet(Integer expectedValue, Integer newValue) //cas操作
get() //获取当前值
add():增加特定的值
subtract(): 减去特定的值
trySet(): 尝试设置计数值
使用的时候,必须检查返回结果的succeeded(), 它代表此操作是否成功。如果操作成功, preValue()代表操作前的值, postValue()代表操作后的值。
public static void count(CuratorFramework zkClient) throws Exception { //分布式计数器 DistributedAtomicInteger counter=new DistributedAtomicInteger(zkClient,"/super",new RetryNTimes(3,100)); //初始化 counter.forceSet(0); AtomicValue<Integer> value = counter.increment();//原子自增 System.out.println("原值为"+value.preValue()); System.out.println("更改后的值为"+value.postValue()); System.out.println("状态"+value.succeeded()); } public static void main(String[] args) throws Exception { ZkLock zkl=new ZkLock(); //ZkLock.lockTest(zkl.zkClient); ZkLock.count(zkl.zkClient); }
另外Curator还有一些高端的用法:分布式屏障—Barrier、Double-barrier,分布式队列DistributedQueueDistributed Queue
https://blog.csdn.net/succing/article/details/121779721
https://blog.csdn.net/succing/article/details/121793494
https://blog.csdn.net/succing/article/details/121844550
https://blog.csdn.net/succing/article/details/121802687
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。