赞
踩
目录
Zookeeper是一个开源的分布式的,为分布式框架提供协调服务的Apache项目。
是基于观察者模式设计的分布式服务管理框架,负责储存和管理大家都关心的数据,然后接收观察者的注册,一旦数据状态变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。
Zookeeper = 文件系统 + 通知机制
① Zookeeper:一个领导者(Leader),多个跟随者(Follower)组成的集群。
② 集群只要有半数以上节点存活,Zookeeper集群就能正常服务。适合安装奇数台服务器。
③ 全局数据一致每个Server保存一份相同的数据副本。
④ 先进先出的请求顺序执行。
⑤ 数据更新原子性。
⑥ 实行性。同步时间快。
zookeeper数据模型结构与Unix文件系统很相似,树形结构,每个节点为ZNode,每个ZNode默认存储1MB数据。
1)统一命名管理:对应用/服务进行统一命名,便于识别。
2)统一配置管理:所有节点的配置信息是一致的。配置文件修改后,能快速同步到各个节点上。
3)统一集群管理:实时监控节点变化。
4)服务器动态上下线:客户端能实时洞察到服务器上下线的变化。
5)软负载均衡:Zookeeper中会记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端需求。
下载地址:Apache ZooKeeper -> Download
必须在linux中安装java jdk
- [root@ppitm-c ~]# java -version
- openjdk version "1.8.0_275"
- OpenJDK Runtime Environment (build 1.8.0_275-b01)
- OpenJDK 64-Bit Server VM (build 25.275-b01, mixed mode)
- [root@ppitm-c ~]#
下zookeeper压缩包到系统
- [root@ppitm-c ~]# cd /opt
- [root@ppitm-c opt]# ll
- 总用量 534788
- -rw-r--r--. 1 root root 9311744 2月 2 23:06 apache-zookeeper-3.5.7-bin.tar.gz
解压:
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/
服务端启动脚本:zkServer.sh
客户端启动脚本:zkCli.sh
- [root@ppitm-c bin]# pwd
- /opt/apache-zookeeper-3.6.3-bin/bin
- [root@ppitm-c bin]# ll
- 总用量 64
- -rwxr-xr-x. 1 1000 1000 232 4月 9 2021 README.txt
- -rwxr-xr-x. 1 1000 1000 2066 4月 9 2021 zkCleanup.sh
- -rwxr-xr-x. 1 1000 1000 1158 4月 9 2021 zkCli.cmd
- -rwxr-xr-x. 1 1000 1000 1620 4月 9 2021 zkCli.sh
- -rwxr-xr-x. 1 1000 1000 1843 4月 9 2021 zkEnv.cmd
- -rwxr-xr-x. 1 1000 1000 3690 4月 9 2021 zkEnv.sh
- -rwxr-xr-x. 1 1000 1000 1286 4月 9 2021 zkServer.cmd
- -rwxr-xr-x. 1 1000 1000 4559 4月 9 2021 zkServer-initialize.sh
- -rwxr-xr-x. 1 1000 1000 11332 4月 9 2021 zkServer.sh
- -rwxr-xr-x. 1 1000 1000 988 4月 9 2021 zkSnapShotToolkit.cmd
- -rwxr-xr-x. 1 1000 1000 1377 4月 9 2021 zkSnapShotToolkit.sh
- -rwxr-xr-x. 1 1000 1000 996 4月 9 2021 zkTxnLogToolkit.cmd
- -rwxr-xr-x. 1 1000 1000 1385 4月 9 2021 zkTxnLogToolkit.sh
- [root@ppitm-c bin]#
修改配置:配置文件改为zoo.cfg
修改:dataDir=/opt/apache-zookeeper-3.6.3-bin/zkData
系统提供的只是临时地址,必须自己新建一个目录地址:mkdir zkData
启动服务端:bin/zkServer.sh start
启动客户端:bin/zkCli.sh
1)tickTime=2000:通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒。
2)initLimit=10:LF初始通信时限(10个心跳),初始连接能容忍的最多心跳数。
3)syncLimit=5:LF同步通信时限(5个心跳),Leader和Follower之间通信时间如果超过syncLimit * tickTime,Leader认为Follwer死掉,从服务器列表中删除Follwer。
4)dataDir:保存Zookeeper中的数据
5)clientPort:2181:客户端连接端口
比如在hadoop102、hadoop103、hadoop104节点上部署zookeeper。
解压压缩包(第二章下载的压缩包相同)到/opt目录
修改配置文件zoo.zrx,并修改储存地址
在 /opt/zookeeper-3.5.7/zkData目录下创建一个 myid的文件,并在文件添加相应编号。(比如102服务器就添加2,103服务器就添加3,每个服务器的身份标识都要标注)
zoo.cfg末尾增加配置:server.唯一标识=服务器地址:主从交换信息端口:选举通信端口
(比如:server.2=192.168.16.122:2888:3888)(每个服务端都配置)
- ########################cluster######################
- server.2=hadoop102:2888:3888
- server.3=hadoop103:2888:3888
- server.4=hadoop104:2888:3888
启动超过半数以上服务器将开始选举机制
集群启动停止脚本 bin/zkServer.sh stop 或 用脚本(zk.sh stop)
- #!/bin/bash
- case $1 in
- "start"){
- for i in hadoop102 hadoop103 hadoop104
- do
- echo -------zookeeper $i 启动 -------
- ssh $1 "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
- done
- }
- ;;
- "stop"){
- for i in hadoop102 hadoop103 hadoop104
- do
- echo -------zookeeper $i 停止 -------
- ssh $1 "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
- done
- }
- ;;
- "status"){
- for i in hadoop102 hadoop103 hadoop104
- do
- echo -------zookeeper $i 状态 -------
- ssh $1 "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
- done
- }
- ;;
ls / | 查看节点 |
create 节点名 "数据" | 创建永久节点 |
create -s 节点名 "数据" | 创建带序号永久节点(可重复相同数据) |
create -e 节点名 "数据" | 创建临时节点 |
get -s /sanguo | 获取节点对应的值 |
set -s /sanguo | 修改节点对应的值 |
- [zk: hadoop102 :2181(CONNECTED) 5] ls s /
- [zookeeper]cZxid = 0x0
- ctime = Thu Jan 01 08:00:00 CST 1970
- mZxid = 0x0
- mtime = Thu Jan 01 08:00:00 CST 1970
- pZxid = 0x0
- cversion = 1
- dataVersion = 0
- aclVersion = 0
- ephemeralOwner = 0x0
- dataLength = 0
- numChildren = 1
czxid 创建节点的事务 zxid
每次修改ZooKeeper状态都会 产生一个 ZooKeeper事务 ID。事务 ID是 ZooKeeper中所
有修改总的次序。每 次 修改都有唯一的 zxid,如果 zxid1小于 zxid2,那么 zxid1在 zxid2之
前发生。
ctime znode被创建的毫秒数(从 1970年开始)
mzxid znode最后更新的事务 zxid
mtime znode最后修改的毫秒数(从 1970年开始)
pZxid znode最后更新的子节点 zxid
cversion:znode 子节点变化号,znode 子节点修改次数
dataversion:znode 数据变化号
aclVersion:znode 访问控制列表的变化号
ephemeralOwner:如果是临时节点,这个是znode 拥有者的session id。如果不是
临时节点则是0。
dataLength:znode 的数据长度
numChildren:znode 子节点数量
get -w 节点名 | 监控节点数据变化(注册一次生效一次) |
ls -w 节点名 | 监控节点数变化(注册一次生效一次) |
delete 节点名 | 删除节点 |
deleteall 节点名 | 删除节点及子节点 |
stat 节点名 | 查看节点状态(不看节点值) |
注册一次监听一次
导入依赖
- <dependencies>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.13.1</version>
- </dependency>
-
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.17</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.6.2</version>
- </dependency>
- </dependencies>
创建log4j.properties到根目录(resources)
- log4j.rootLogger=INFO, stdout
- log4j.appender.stdout=org.apache.log4j.ConsoleAppender
- log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
- log4j.appender.stdout.layout.ConversionPattern =%d %p [%c]-%m%n
- log4j.appender.logfile=org.apache.log4j.FileAppender
- log4j.appender.logfile.File=target/spring.log
- log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
- log4j.appender.logfile.layout.ConversionPattern=%d %p [%c]-%m%n
- public class zkClient {
-
- private String connectString="192.168.177.129:2181";//连接zookeeper地址
- private int sessionTimeout=100000;//超时时间(建议调长)
- private ZooKeeper zkClient;
-
- @Before
- public void init() throws IOException {
- //启动客户端并设置监听器
- zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
- public void process(WatchedEvent watchedEvent) {
- List<String> children = null;//监听根目录,用上面初始化的监听器
- try {
- children = zkClient.getChildren("/", true);
- System.out.println("----------------------------------");
- for (String child : children) {
- System.out.println(child);
- }
- System.out.println("----------------------------------");
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- }
- });
-
- }
- }
- @Test
- public void create() throws KeeperException, InterruptedException {
- //创建节点(节点路径、内容、权限控制、什么样的节点)
- String nodeCreated = zkClient.create("/zhijia", "qyh.avi".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- @Test
- public void getChildren() throws KeeperException, InterruptedException {
-
- // List<String> children = zkClient.getChildren("/", true);//监听根目录,用上面初始化的监听器
- //
- // for (String child : children) {
- // System.out.println(child);
- // }
- Thread.sleep(50000);
- }
- @Test
- public void exist() throws KeeperException, InterruptedException {
- //监听某个节点是否存在
- Stat stat = zkClient.exists("/zhijia", false);
-
- System.out.println(stat == null ? "not exit" : "exist");
- }
- [zk: localhost:2181(CONNECTED) 10] create /servers "servers"
- Created /servers
- package com.zhijia.case1;
-
- import org.apache.zookeeper.*;
-
- import java.io.IOException;
-
- /**服务器动态上下线————服务器注册
- * @author zhijia
- * @create 2022-01-27 11:03
- */
- public class DistributeServer {
-
- private String connectString="192.168.177.129:2181";
- private int sessionTimeout=100000;
- private ZooKeeper zk;
-
- public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
-
- DistributeServer server = new DistributeServer();
- //1、获取zk连接
- server.getConnect();
- //2、注册服务器到zk集群
- server.regist(args[0]);
- //3、启动业务逻辑(睡觉)
- server.business();
-
- }
-
- //启动业务逻辑(睡觉)
- private void business() throws InterruptedException {
- Thread.sleep(Long.MAX_VALUE);
- }
-
- //注册服务器到zk集群
- private void regist(String hostname) throws KeeperException, InterruptedException {//节点路径,主机名,权限,什么样的节点
- String create = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
-
- System.out.println(hostname+" is online ");
- }
-
- //获取zk连接
- private void getConnect() throws IOException {
- zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
- public void process(WatchedEvent watchedEvent) {
-
- }
- });
- }
- }
- package com.zhijia.case1;
-
- import org.apache.zookeeper.KeeperException;
- import org.apache.zookeeper.WatchedEvent;
- import org.apache.zookeeper.Watcher;
- import org.apache.zookeeper.ZooKeeper;
-
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
-
- /**服务器动态上下线————客户端监听
- * @author zhijia
- * @create 2022-01-27 11:21
- */
- public class DistributeClient {
-
- private String connectString="192.168.177.129:2181";
- private int sessionTimeout=100000;
- private ZooKeeper zk;
-
- public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
-
- DistributeClient client = new DistributeClient();
- //1、获取zk连接
- client.getConnect();
- //2、监听/servers下面子节点的增加和删除
- client.getServerList();
- //3、业务逻辑
- client.business();
- }
-
- //业务逻辑
- private void business() throws InterruptedException {
- Thread.sleep(Long.MAX_VALUE);
- }
-
- //获取zk连接
- private void getConnect() throws IOException {
- zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
- public void process(WatchedEvent watchedEvent) {
- try {
- getServerList();
- } catch (KeeperException e) {
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- }
-
- //监听/servers下面子节点的增加和删除
- public void getServerList() throws KeeperException, InterruptedException {
- List<String> children = zk.getChildren("/servers", true);
-
- ArrayList<String> servers = new ArrayList<String>();
-
- for (String child : children) {
- byte[] data = zk.getData("/servers/" + child, false, null);
- servers.add(new String(data));
- }
-
- System.out.println(servers);
- }
- }
- package com.zhijia.case2;
-
- import org.apache.zookeeper.*;
- import org.apache.zookeeper.data.Stat;
-
- import java.io.IOException;
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
-
- /**
- * @author zhijia
- * @create 2022-01-27 15:09
- */
- public class DistributedLock {
-
- private final String connectString="192.168.177.129:2181";
- private final int sessionTimeout=100000;
-
- private CountDownLatch countDownLatch=new CountDownLatch(1);
- private CountDownLatch waitLatch=new CountDownLatch(1);
-
- private final ZooKeeper zk;
-
- private String waitPath;
- private String currentMode;
-
- public DistributedLock() throws IOException, InterruptedException, KeeperException {
- //获取连接
- zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
- public void process(WatchedEvent watchedEvent) {
- //连接上,释放
- if(watchedEvent.getState()==Event.KeeperState.SyncConnected){//监听状态为连接
- countDownLatch.countDown();
- }
- //waitLatch,释放
- if(watchedEvent.getType()==Event.EventType.NodeDeleted&&watchedEvent.getPath().equals(waitPath)){
- waitLatch.countDown();
- }
- }
- });
-
- //等待zk正常连接后才往下走
- countDownLatch.await();
-
- //判断根节点/locks是否存在
- Stat stat = zk.exists("/locks", false);
-
- if(stat==null){
- //需要创建根节点
- zk.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
- }
- }
-
- //加锁
- public void zkLock(){
- //创建对应的临时带序号节点
- try {
- currentMode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
-
- //判断创建的节点是否是最小序号节点,若是就获取节点,若不是就监听前一个节点
- List<String> children = zk.getChildren("/locks", false);
-
- //一个就取值,多个就判断最小
- if(children.size()==1){
- return;
- }else {
- //排序
- Collections.sort(children);
- //获取节点名称
- String thisNode = currentMode.substring("/locks/".length());
- int index=children.indexOf(thisNode);
-
- if(index==-1){
- System.out.println("数据异常");
- }else if(index==0){
- return;
- }else {
- //监听前一个节点
- waitPath="/locks/"+children.get(index-1);
- zk.getData(waitPath,true,null);
-
- //等待监听
- waitLatch.await();
- return;
- }
- }
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- //解锁
- public void unZkLock(){
- //删除节点
- try {
- zk.delete(currentMode,-1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (KeeperException e) {
- e.printStackTrace();
- }
- }
- }
- package com.zhijia.case2;
-
- import org.apache.zookeeper.KeeperException;
-
- import java.io.IOException;
-
- /**
- * @author zhijia
- * @create 2022-01-27 16:10
- */
- public class DistributeLockTest {
- public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
- final DistributedLock lock1 = new DistributedLock();
- final DistributedLock lock2 = new DistributedLock();
-
- new Thread(new Runnable() {
- public void run() {
-
- try {
- lock1.zkLock();
- System.out.println("线程1 启动:获取到锁");
-
- Thread.sleep(10*1000);
- lock1.unZkLock();
- System.out.println("线程1 :释放锁");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).start();
-
-
- new Thread(new Runnable() {
- public void run() {
-
- try {
- lock2.zkLock();
- System.out.println("线程2 启动:获取到锁");
-
- Thread.sleep(10*1000);
- lock2.unZkLock();
- System.out.println("线程2 :释放锁");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).start();
- }
- }
① 会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
② Watch需要重复注册,不然就不能生效
③ 开发的复杂性还是比较高的
④ 不支持多节点删除和创建。需要自己去递归
官网:http://curator.apache.org/index.html
① 添加依赖
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>4.3.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>4.3.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-client</artifactId>
- <version>4.3.0</version>
- </dependency>
② 代码实现
- package com.zhijia.case3;
-
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.framework.recipes.locks.InterProcessMutex;
- import org.apache.curator.retry.ExponentialBackoffRetry;
-
- /**zookeeper获取客户端连接
- * @author zhijia
- * @create 2022-01-28 9:47
- */
- public class CuratorLockTest {
- public static void main(String[] args) {
- //创建分布式锁1
- final InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
-
- //创建分布式锁2
- final InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
-
- new Thread(new Runnable() {
- public void run() {
- //获取锁
- try {
- lock1.acquire();
- System.out.println("线程1:获取锁");
-
- lock1.acquire();
- System.out.println("线程1:再次获取锁");
-
- Thread.sleep(5000);
-
- lock1.release();
- System.out.println("线程1:释放锁");
-
- lock1.release();
- System.out.println("线程1:再次释放锁");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }).start();
-
- new Thread(new Runnable() {
- public void run() {
- //获取锁
- try {
- lock2.acquire();
- System.out.println("线程2:获取锁");
-
- lock2.acquire();
- System.out.println("线程2:再次获取锁");
-
- Thread.sleep(5000);
-
- lock2.release();
- System.out.println("线程2:释放锁");
-
- lock2.release();
- System.out.println("线程2:再次释放锁");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }).start();
- }
-
- public static CuratorFramework getCuratorFramework() {
-
- //重试(多少秒重试,重试次数)
- ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
-
- //客户端(连接主机,超时时间,连接失败后间隔多少秒重试)
- CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.177.129:2181")
- .connectionTimeoutMs(10000)
- .retryPolicy(policy).build();
-
- //启动客户端
- client.start();
-
- System.out.println("zookeeper启动成功");
-
- return client;
- }
- }
半数 机制 ,超过半数的投票通过,即通过。
投票过半数时,
服务器 id大的胜出
①
EPOCH大的直接胜出
②
EPOCH相同,事务 id大的胜出
③事务
id相同,服务器 id大的胜出
安装奇数台
生产经验:
⚫ 10台 服务器: 3台 zk
⚫ 20台 服务器: 5台 zk
⚫ 100台 服务器: 11台 zk
⚫ 200台 服务器: 11台 zk
服务器台数多:好处,提高可靠性;坏处:提高通信延时
ls、 get、 create、 delete
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。