赞
踩
Zookeeper 是一个 开源 的 分布式 的,为分布式应用提供 协调 服务的 Apache 项目。
Zookeeper 从设计模式角度来理解,是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生了变化,Zookeeper 就负责通知已经在 Zookeeper 上注册的那些观察者做出相应的反应。
Zookeeper = 文件系统 + 通知机制
ZooKeeper 数据模型的结构与 Unix 文件系统很类似,整体上可以看作是一棵树,每个节点称做一个 ZNode。每一个 ZNode 默认能够存储 1MB 的数据,每个 ZNode 都可以通过其路径唯一标识。
ZooKeeper 中不存在文件的概念,节点中存储的直接就是内容
提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。
(1)统一命名服务
在分布式环境下,经常需要对应用/服务进行统一命名,便于识别。
例如:IP不容易记住,而域名容易记住。
(2)统一配置管理
(3)统一集群管理
(4)服务器节点动态上下线
(5)软负载均衡
在 Zookeeper 中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求
软负载均衡即从软件层面(配置)实现负载均衡, 硬负载均衡即从硬件层面实现负载均衡。
见博客 Zookeeper 安装与部署
集群启动 Zookeeper 后,每一台机器上启动的都是服务端,要操作客户端,还需要启动客户端(最好是新开一个shell窗口单独作为客户端)。
[huwei@hadoop101 ~]$ cd /opt/module/zookeeper-3.5.7
[huwei@hadoop101 zookeeper-3.5.7]$ bin/zkCli.sh -server hadoop101:2181
由于 zookeeper 的数据都是同步的,客户端连接到 hadoop101、hadoop102、hadoop103 哪个机器都是 OK 的
(1)查看当前 znode
中所包含的节点
[zk: hadoop101:2181(CONNECTED) 0] ls /
[zookeeper]
[zk: hadoop101:2181(CONNECTED) 1] ls /zookeeper
[config, quota]
[zk: hadoop101:2181(CONNECTED) 2] ls /zookeeper/config
[]
(2)查看当前节点详细数据
[zk: hadoop101:2181(CONNECTED) 3] ls -s /
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
(3)创建普通节点
无法同时创建多级节点,除非父级节点存在,也可以在创建节点时指定节点的内容
[zk: hadoop101:2181(CONNECTED) 4] create /sanguo
Created /sanguo
[zk: hadoop101:2181(CONNECTED) 5] ls /
[sanguo, zookeeper]
[zk: hadoop101:2181(CONNECTED) 6] create /sanguo/shuguo "liubei"
Created /sanguo/shuguo
当创建临时节点时,在当前客户端是能查看到的,退出当前客户端然后再重启客户端,再次查看会发现临时节点已经删除
(4)创建带序号的节点
先创建一个普通节点
[zk: hadoop101:2181(CONNECTED) 7] create /sanguo/weiguo "caocao"
Created /sanguo/weiguo
再创建带序号的节点
[zk: hadoop101:2181(CONNECTED) 8] create -s /sanguo/weiguo "caocao"
Created /sanguo/weiguo0000000002
[zk: hadoop101:2181(CONNECTED) 9] ls /sanguo
[shuguo, weiguo, weiguo0000000002]
如果节点下原来没有子节点,序号从0开始依次递增。如果原节点下已有2个节点,则再排序时从2开始,以此类推。
(5)获取节点的值
[zk: hadoop101:2181(CONNECTED) 10] get /sanguo/shuguo
liubei
(6)修改节点的值
[zk: hadoop101:2181(CONNECTED) 11] set /sanguo/shuguo "kongming"
[zk: hadoop101:2181(CONNECTED) 12] get /sanguo/shuguo
kongming
(7)节点的值变化监听
在 hadoop102 主机上注册监听 /sanguo
节点数据变化
[huwei@hadoop102 zookeeper-3.5.7]$ bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 0] get -w /sanguo
null
在 hadoop101 主机上修改 /sanguo
节点数据
[zk: hadoop101:2181(CONNECTED) 13] set /sanguo "simayi"
此时,在 hadoop102 主机上见听到了 /sanguo
节点数据的变化
同理,
ls
命令也可以加参数-w
,当新创建或删除文件后,可监听文件的变化
(8)查看节点的状态
[zk: hadoop101:2181(CONNECTED) 14] stat /sanguo
cZxid = 0x200000002
ctime = Sun Dec 03 15:31:07 CST 2023
mZxid = 0x200000008
mtime = Sun Dec 03 16:02:16 CST 2023
pZxid = 0x200000005
cversion = 3
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 3
(9)删除节点
[zk: hadoop101:2181(CONNECTED) 15] delete /sanguo
Node not empty: /sanguo
[zk: hadoop101:2181(CONNECTED) 16] delete /sanguo/weiguo0000000002
只能删除内容为空的节点
(10)递归删除节点
[zk: hadoop101:2181(CONNECTED) 17] deleteall /sanguo
可以递归地删除内容非空的节点
(1)创建一个Maven Module
(2)添加 pom 文件
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.7</version> </dependency> </dependencies>
(3)配置 log4j.properties
文件
需要在项目的 src/main/resources
目录下,新建一个文件,命名为log4j.properties
,在文件中填入以下内容
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
public class ZookeeperTest { private ZooKeeper zkClient; private String connectString; private int sessionTimeout; /** 获取客户端对象 */ @Before public void init() throws IOException { connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; int sessionTimeout = 10000; // 单位毫秒,一般设置10000~40000 //参数1 connectString,连接zk服务的地址 //参数2 sessionTimeout,超时时间 //参数3 当前客户端默认的监控器 zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("根据具体的逻辑进行下一步操作"); } }); } /** * 关闭客户端对象 */ @After public void close() throws InterruptedException { zkClient.close(); } }
@Test
public void create() throws InterruptedException, KeeperException {
//参数1 指定创建节点的路径
//参数2 指定要创建节点下的数据
//参数3 对操作用户进行权限控制
//参数4 节点类型、短暂、持久、短暂带序号、持久带序号
zkClient.create("/sanguo", "liubei".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
(1)获取子节点列表,不监听
/**
* 获取节点,不监听
*/
@Test
public void get() throws InterruptedException, KeeperException {
//参数1 指定获取节点的路径
//参数2 是否监听
List<String> children = zkClient.getChildren("/", false);
System.out.println(children);
for (String child : children) {
System.out.println(child); // 获取每一个节点名称
}
}
(2)获取子节点列表,并监听
/** * 获取节点,监听 */ @Test public void getAndWatch() throws InterruptedException, KeeperException { //参数1 指定获取节点的路径 //参数2 是否监听 //参数3 当前客户端默认的监控器 List<String> children = zkClient.getChildren("/", new Watcher() { public void process(WatchedEvent watchedEvent) { System.out.println("根目录下的节点有变化"); } }); System.out.println(children); for (String child : children) { System.out.println(child); // 获取每一个节点名称 } //因为设置了监听,所以当前线程不能结束 Thread.sleep(Long.MAX_VALUE); }
启动,再去根目录下创建一个新节点
[zk: localhost:2181(CONNECTED) 2] create /shuihu
Created /shuihu
查看 IDEA 终端
/**
* 判断Znode是否存在
*/
@Test
public void exist() throws InterruptedException, KeeperException {
//参数1 指定判断节点的路径
//参数2 是否监听
Stat stat = zkClient.exists("/xiyou", false);
System.out.println(stat == null ? "not exist" : "exist");
}
/** * 获取子节点存储的数据 */ @Test public void getData() throws InterruptedException, KeeperException { //判断节点是否存在 Stat stat = zkClient.exists("/sanguo", false); if (stat == null) { System.out.println("节点不存在..."); return; } //参数1 指定判断节点的路径 //参数2 是否监听 byte[] data = zkClient.getData("/sanguo", false, stat); System.out.println(new String(data)); }
/** * 设置节点的值 */ @Test public void set() throws KeeperException, InterruptedException { //判断节点是否存在 Stat stat = zkClient.exists("/sanguo", false); if (stat == null) { System.out.println("节点不存在..."); return; } //参数1 指定判断节点的路径 //参数2 节点的值 //参数3 版本号 zkClient.setData("/sanguo", "caocao".getBytes(), stat.getVersion()); }
参数3 版本号也可以写 -1,但不能不传这个参数
(1)删除空节点
/**
* 删除空节点
*/
@Test
public void delete() throws KeeperException, InterruptedException {
//判断节点是否存在
Stat stat = zkClient.exists("/aaa", false);
if (stat == null) {
System.out.println("节点不存在...");
return;
}
zkClient.delete("/aaa", stat.getVersion());
}
(2)删除非空节点,递归实现
/** * 删除非空节点,递归实现 */ //封装一个方法,方便递归调用 public void deleteAll(String path, ZooKeeper zk) throws KeeperException, InterruptedException { //判断节点是否存在 Stat stat = zkClient.exists(path, false); if (stat == null) { System.out.println("节点不存在..."); return; } //先获取当前传入节点下的所有子节点 List<String> children = zk.getChildren(path, false); if (children.isEmpty()) { //说明传入的节点没有子节点,可以直接删除 zk.delete(path, stat.getVersion()); } else { //如果传入的节点有子节点,循环所有子节点 for (String child : children) { //删除子节点,但是不知道子节点下面还有没有子节点,所以递归调用 deleteAll(path + "/" + child, zk); } //删除完所有子节点以后,记得删除传入的节点 zk.delete(path, stat.getVersion()); } } //测试deleteAll @Test public void testDeleteAll() throws KeeperException, InterruptedException { deleteAll("/shuihu", zkClient); }
持久(Persistent):客户端和服务器端断开连接后,创建的节点不删除
短暂(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除
注意:创建 znode 时设置顺序标识,znode 名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护,在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序
[zk: hadoop101:2181(CONNECTED) 18] stat /
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x20000000d
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
每次修改 ZooKeeper 状态都会收到一个zxid形式的时间戳,也就是ZooKeeper事务ID。事务ID是 ZooKeeper 中所有修改总的次序。每个修改都有唯一的zxid,如果zxid1小于zxid2,那么zxid1在zxid2之前发生。
常见的监听
get -w path
ls -w path
(1)首先要有一个 main()
线程
(2)在 main()
线程中创建 ZooKeeper 客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener)
(3)客户端通过 connet 线程将注册的监听事件发送给 ZooKeeper
(4)在 ZooKeeper 的注册监听器列表中将注册的监听事件添加到列表中
(5)ZooKeeper 监听到有数据或路径的变化,就会将这个消息发送给 listener 线程
(6) 客户端 listener 线程内部调用 process() 方法做出相应处理
半数机制:集群中半数以上机器存活,集群可用。所以 Zookeeper 适合安装 奇数台服务器。
一般情况下 Zookeeper 集群更推荐使用奇数台机器原因?
- 在 Zookeeper 集群中 奇数台 和 偶数台(接近的台数) 机器的容错能力是一样的,所以在考虑资源节省的情况,我们推荐使用奇数台方案
- 三台机器要使得集群正常运行只能挂一台,四台机器要使得集群正常运行也只能挂一台,既然容错性是一样的,为什么不节省点计算机资源呢?
Zookeeper 虽然在配置文件中并没有指定 Master 和 Slave。但是,Zookeeper工作时,是有一个节点为 Leader,其他则为Follower,Leader是通过内部的 选举机制 临时产生的。
选举机制总原则:集群中的每台机器都参与投票,通过交换选票信息得到每台机器的最终得票, 一旦出现得票数超过机器总数 一半以上 数量,当前机器即为 leader。
选票过程中每台机器怎么通信的?
- 每台机器的 ip ,加上端口号
3888
以一个简单的例子来说明整个选举的过程。
假设有五台服务器组成的 Zookeeper 集群,它们的 id 从 1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的。假设这些服务器依序启动,来看看会发生什么。
(1)服务器 1 启动,发起一次选举。服务器 1 投自己一票。此时服务器 1 票数一票,不够半数以上(3票),选举无法完成,服务器 1 状态保持为 LOCKING
;
(2)服务器2启动,再发起一次选举。服务器 1 和 2 分别投自己一票并交换选票信息:此时服务器 1 发现服务器2的 ID 比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此时服务器 1 票数 0 票,服务器 2 票数 2 票,没有半数以上结果,选举无法完成,服务器1,2 状态保持 LOCKING
;
(3)服务器 3 启动,发起一次选举。此时服务器 1 和 2 都会更改选票为服务器 3。此次投票结果:服务器1为0票,服务器2为0票,服务器 3 为 3 票。此时服务器 3 的票数已经超过半数,服务器 3 当选 Leader。服务器 1,2 更改状态为 FOLLOWING
,服务器3更改状态为LEADING
;
(4)服务器 4 启动,发现当前集群已经有 leader,它自己自动成为follower
(5)服务器5启动,同服务器 4一样。
以5台机器为例,当前集群正在使用(有数据/没数据),leader突然宕机的情况。
- 当集群中的leader挂掉,集群会重新选出一个leader,此时首先会比较每一台机器的czxid,czxid最大的被选为leader。极端情况,czxid都相等的情况,那么就会直接比较myid。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。