赞
踩
Zookeeper 是一个开源的分布式的,为分布式应用提供协调服务的 Apache 项目。
Zookeeper 从设计模式角度来理解:是一个基于观案者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper 就将负责通知已经在 Zookeeper 上注册的那些观察者做出相应的反应。
ZooKeeper 数据模型的结构与 Unix 文件系统很类似,整体上可以看作是一棵树,每个节点称做一个 ZNode。每一个 ZNode 默认能够存储 1 MB 的数据,每个 ZNode 都可以通过其路径唯一标识。
提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。
统一命名服务
在分布式环境下,经常需要对应用/服务进行统一命名 ,便于识别。例如:IP 不容易记住,而域名容易记住。
统一配置管理
(1)分布式环境下,配置文件同步非常常见。
① 一般要求一个集群中,所有节点的配置信息是一致的,比如 Kafka 集群。
② 对配置文件修改后,希望能够快速同步到各个节点上。
(2)配置管理可交由 ZooKeeper 实现。
① 可将配置信息写入 ZooKeeper 上的一个 Znode 。
② 各个客户端服务器监听这个 Znode。
③ 一旦 Znode 中的数据被修改,ZooKeeper 将通知各个客户端服务器。
统一集群管理
(1)分布式环境中,实时掌握每个节点的状态是必要的。
可根据节点实时状态做出一些调整。
(2)ZooKeeper 可以实现实时监控节点状态变化
① 可将节点信息写入Z ooKeeper 上的一个 ZNode。
② 监听这个 ZNode 可获取它的实时状态变化。
服务器动态上下线
软负载均衡
在 Zookeeper 中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求。
准备工作
① 拷贝 Zookeeper 安装包到 Linux 系统下(apache-zookeeper-3.5.6-bin.tar.gz)
② 解压到指定目录
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz -C /hadoop/
③ 重命名
mv apache-zookeeper-3.5.6-bin/ zookeeper-3.5.6
修改配置
① 将 /hadoop/zookeeper-3.5.6/conf 这个路径下的 zoo_sample.cfg 修改为 zoo.cfg
mv zoo_sample.cfg zoo.cfg
② 打开 zoo.cfg 文件,修改 dataDir 路径
dataDir=/hadoop/zookeeper-3.5.6/zkData
③ 在 /hadoop/zookeeper-3.5.6/ 这个目录上创建 zkData 文件夹
mkdir zkData
修改环境变量
① 打开配置文件
vim /etc/profile
② 在配置文件中添加以下内容
#ZOOKEEPER
export ZOOKEEPER_HOME=/hadoop/zookeeper-3.5.6
export PATH=$PATH:$ZOOKEEPER_HOME/bin
③ 使配置文件生效
source /etc/profile
操作 Zookeeper
① 启动 Zookeeper
zkServer.sh start
② 查看进程是否启动
jps
③ 查看状态
zkServer.sh status
④ 启动客户端
zkCli.sh
⑤ 退出客户端
quit
⑥ 停止 Zookeeper
zkServer.sh stop
集群规划
在 master、slave1 和 slave2 三个节点上部署 Zookeeper。
解压安装
① 解压 Zookeeper 安装包到 /hadoop/ 目录下
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz -C /hadoop/
③ 重命名
mv apache-zookeeper-3.5.6-bin/ zookeeper-3.5.6
③ 同步 /hadoop/zookeeper-3.5.6 目录内容到 slave1、slave2
xsync zookeeper-3.5.6/
配置服务器编号
① 在 /hadoop/zookeeper-3.5.6/ 这个目录下创建 zkData
mkdir zkData
② /hadoop/zookeeper-3.5.6/zkData 目录下创建一个 myid 的文件
touch myid
③ 编辑 myid 文件
vim myid
在文件中添加与 server 对应的编号:
0
④ 分发到其他机器上
xsync myid
并分别在 slave1、slave2 上修改 myid 文件中内容为 1、2
配置 zoo.cfg 文件
① 将 /hadoop/zookeeper-3.5.6/conf 这个路径下的 zoo_sample.cfg 修改为 zoo.cfg
mv zoo_sample.cfg zoo.cfg
② 打开 zoo.cfg 文件,修改 dataDir 路径
dataDir=/hadoop/zookeeper-3.5.6/zkData
增加如下配置
server.0=master:2888:3888
server.1=slave1:2888:3888
server.2=slave2:2888:3888
③ 同步 zoo.cfg 配置文件
xsync zoo.cfg
修改环境变量
① 打开配置文件
vim /etc/profile
② 在配置文件中添加以下内容
#ZOOKEEPER
export ZOOKEEPER_HOME=/hadoop/zookeeper-3.5.6
export PATH=$PATH:$ZOOKEEPER_HOME/bin
③ 同步配置文件
xsync /etc/profile
④ 使配置文件生效(三台机器)
source /etc/profile
集群操作
① 三台机器分别启动 Zookeeper
zkServer.sh start
② 三台机器分别关闭 Zookeeper
zkServer.sh stop
编写 Zookeeper 的群起群关脚本
① 在 /usr/local/bin 目录下创建 zk 文件
vim zk
在文件中输入以下内容:
#!/bin/bash case $1 in "start"){ for i in master slave1 slave2 do echo "****************** $i *********************" ssh $i "source /etc/profile && zkServer.sh start" done };; "stop"){ for i in master slave1 slave2 do echo "****************** $i *********************" ssh $i "source /etc/profile && zkServer.sh stop" done };; esac
② 修改脚本 zk 具有执行权限
chmod 777 zk
③ 调用脚本形式:zk start 或 zk stop
Zookeeper 中的配置文件 zoo.cfg 中参数含义解读如下:
tickTime =2000:通信心跳数,Zookeeper 服务器与客户端心跳时间,单位毫秒
Zookeeper 使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime 时间就会发送一个心跳,时间单位为毫秒。它用于心跳机制,并且设置最小的 session 超时时间为两倍心跳时间。(session 的最小超时时间是 2*tickTime)
initLimit =10:LF 初始通信时限
集群中的 Follower 跟随者服务器与 Leader 领导者服务器之间初始连接时能容忍的最多心跳数(tickTime的数量),用它来限定集群中的 Zookeeper 服务器连接到 Leader 的时限。
syncLimit =5:LF 同步通信时限
集群中 Leader 与 Follower 之间的最大响应时间单位,假如响应超过 syncLimit * tickTime,Leader 认为 Follwer 死掉,从服务器列表中删除 Follwer。
dataDir:数据文件目录+数据持久化路径
主要用于保存 Zookeeper 中的数据。
clientPort =2181:客户端连接端口
监听客户端连接的端口。
server.A=B:C:D
A 是一个数字,表示这个是第几号服务器;集群模式下配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面有一个数据就是 A 的值,Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断到底是哪个server。
B 是这个服务器的 ip 地址;
C 是这个服务器与集群中的 Leader 服务器交换信息的端口;
D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
半数机制
集群中半数以上机器存活,集群可用。所以 Zookeeper 适合安装奇数台服务器。
Zookeeper 虽然在配置文件中并没有指定 Master 和 Slave。但是,Zookeeper 工作时,是有一个节点为 Leader,其他则为 Follower,Leader 是通过内部的选举机制临时产生的。
选举过程例子
假设有五台服务器组成的 Zookeeper 集群,它们的 id 从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的。假设这些服务器依序启动。
① 服务器 1 启动,此时只有它一台服务器启动了,它发出去的报文没有任何响应,所以它的选举状态一直是 LOOKING 状态。
② 服务器 2 启动,它与最开始启动的服务器 1 进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以 id 值较大的服务器 2 胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是 3),所以服务器 1、2 还是继续保持 LOOKING 状态。
③ 服务器 3 启动,根据前面的理论分析,服务器 3 成为服务器 1、2、3 中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的 Leader。
④ 服务器 4 启动,根据前面的分析,理论上服务器4应该是服务器 1、2、3、4 中最大的,但是由于前面已经有半数以上的服务器选举了服务器 3,所以它只能接收当小弟的命了。
⑤ 服务器 5 启动,同 4 一样当小弟。
持久(Persistent)
客户端和服务器端断开连接后,创建的节点不删除
短暂(Ephemeral)
客户端和服务器端断开连接后,创建的节点自己删除
节点类型
① 持久化目录节点
客户端与 Zookeeper 断开连接后,该节点依旧存在。
② 持久化顺序编号目录节点
客户端与 Zookeeper 断开连接后,该节点依旧存在,只是 Zookeeper 给该节点名称进行顺序编号
③ 临时目录节点
客户端与 Zookeeper 断开连接后,该节点被删除
④ 临时顺序编号目录节点
客户端与 Zookeeper 断开连接后,该节点被删除,只是 Zookeeper 给该节点名称进行顺序编号。
说明: 创建 znode 时设置顺序标识,znode 名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护。
注意: 在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序。
czxid: 创建节点的事务 zxid
每次修改 ZooKeeper 状态都会收到一个 zxid 形式的时间戳,也就是 ZooKeepe r事务 ID。
事务 ID 是 ZooKeeper 中所有修改总的次序。每个修改都有唯一的 zxid,若 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生。
ctime: znode 被创建的毫秒数(从 1970 年开始)
mzxid: znode 最后更新的事务 zxid
mtime: znode 最后修改的毫秒数(从 1970 年开始)
pZxid: znode 最后更新的子节点 zxid
cversion : znode 子节点变化号,znode 子节点修改次数
dataversion: znode 数据变化号
aclVersion: znode 访问控制列表的变化号
ephemeralOwner: 如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。
dataLength: znode 的数据长度
numChildren: znode 子节点数量
监听原理详解:
① 首先要有一个 main() 线程
② 在 main 线程中创建 Zokeeper 客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener) 。
③ 通过 connect 线程将注册的监听事件发送给 Zookeeper。
④ 在 Zookeeper 的注册监听器列表中将注册的监听事件添加到列表中。
⑤ Zookeeper 监听到有数据或路径变化,就会将这个消息发送给 listener 线程。
⑥ listener 线程内部调用了 process() 方法。
常见的监听
① 监听节点数据的变化
get -w path
② 监听子节点增减的变化
ls -w path
zkCli.sh
help
ls /
ls -s /
create /animals "dog"
create /animals/small "ant"
get /animals
get /animals/small
create -e /animals/big "elephant"
create -s /animals/middle "hourse"
set /animals/small "bug"
① 在 slave1 主机上注册监听 /animals 节点数据变化
get -w /animals
② 在 slave2 主机上修改 /animals 节点的数据
set /animals "cat"
③ 观察 slave1 主机收到子节点变化的监听
① 在 slave1 主机上注册监听 /animals 节点的子节点变化
ls -w /animals
② 在 slave2 主机 /animals 节点上创建子节点
create /animals/mini "fly"
③ 观察 slave1 主机收到子节点变化的监听
delete /animals/big
deleteall /animals/mini
stat /animals
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.5.6</version> </dependency> </dependencies>
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
package zookeeper; import org.apache.zookeeper.*; import org.junit.Before; import org.junit.Test; import java.io.IOException; public class TestZookeeper { private String connectString = "master:2181,slave1:2181,slave2:2181"; private int sessionTimeout = 2000; private ZooKeeper zkClient; @Test public void init() throws IOException { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { public void process(WatchedEvent watchedEvent) { } }); }
先将上面的 init() 方法前面的注解 @Test 改为 @Before
// 创建子节点
@Test
public void createNode() throws Exception {
// 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型
String path = zkClient.create("/demo", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(path);
}
// 获取子节点并监听节点变化
@Test
public void getChildrenAndWatch() throws Exception {
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
// 延时阻塞
Thread.sleep(Long.MAX_VALUE);
}
@Before public void init() throws IOException { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { public void process(WatchedEvent watchedEvent) { List<String> children = null; try { children = zkClient.getChildren("/", true); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } for (String child : children) { System.out.println(child); } } }); }
// 判断znode是否存在
@Test
public void exist() throws Exception {
Stat stat = zkClient.exists("/animals", false);
System.out.println(stat == null ? "not exist" : "exist");
}
需求
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
需求分析
代码实现
① 先在集群上创建 /servers 节点
create /servers "servers"
② 服务器端向 Zookeeper 注册代码
package zookeeper; import org.apache.zookeeper.*; import java.io.IOException; public class DistributeServer { private String connectString = "master:2181,slave1:2181,slave2:2181"; private int sessionTimeout = 2000; private ZooKeeper zkClient; public static void main(String[] args) throws Exception { args = new String[]{"slave1"}; DistributeServer server = new DistributeServer(); // 1.连接zookeeper集群 server.getConnect(); // 2.注册节点 server.register(args[0]); // 3.业务逻辑处理 server.business(); } private void getConnect() throws IOException { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { public void process(WatchedEvent event) { } }); } private void register(String hostname) throws KeeperException, InterruptedException { String path = zkClient.create("/servers/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname + " is online"); } private void business() throws InterruptedException { Thread.sleep(Long.MAX_VALUE); } }
③ 客户端代码
package zookeeper; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class DistributeClient { private String connectString = "master:2181,slave1:2181,slave2:2181"; private int sessionTimeout = 2000; private ZooKeeper zkClient; public static void main(String[] args) throws Exception { DistributeClient client = new DistributeClient(); // 1.连接zookeeper集群 client.getConnect(); // 2.注册监听 client.getChildren(); // 3.业务逻辑处理 client.business(); } private void getConnect() throws IOException { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { public void process(WatchedEvent event) { try { getChildren(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); } private void getChildren() throws KeeperException, InterruptedException { List<String> children = zkClient.getChildren("/servers", true); // 存储服务器节点主机名称集合 ArrayList<String> hosts = new ArrayList<String>(); for (String child : children) { byte[] data = zkClient.getData("/servers/" + child, false, null); hosts.add(new String(data)); } System.out.println(hosts); } private void business() throws InterruptedException { Thread.sleep(Long.MAX_VALUE); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。