当前位置:   article > 正文

Zookeeper详解

zookeeper详解

1.Zookeeper概述

1.Zookeeper概念

Zookeeper是 Apache Hadoop 项目下的一个子项目,是一个树形目录服务

Zookeeper 翻译过来就是动物园管理员,他是用来管 Hadoop(大象)、Hive(蜜蜂)、Pig(小猪)的管理员。简称zk

Hadoop: 存储海量数据和分析海量数据的工具

Hive: 基于Hadoop的一个数据仓库工具,用来进行数据提取、转化、加载

Pig: 基于Hadoop的大规模数据分析平台

Zookeeper 是一个开源的,分布式应用程序的协调服务

Zookeeper 提供的主要功能包括:

注册中心:比如Dubbo的注册中心

分布式锁:比如卖电影票有好多入口:官网、猫眼、淘票票

配置管理:比如用来保存项目的各种配置信息(数据库连接信息);常用的配置中心:Apollo(百度开源)、Nacos(阿里开源)

2.ZooKeeper 命令操作

1.数据模型

ZooKeeper 是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构。

这里面的每一个节点都被称为:ZNode,每个节点上都会保存自己的数据和节点信息。

节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下

节点可以分为四大类:

持久节点(PERSISTENT):创建后一直存在,直到主动删除此节点

临时节点(EPHEMERAL):生命周期依赖于客户端会话,对应客户端会话失效后节点自动清除:-e

持久顺序节点(PERSISTENT_SEQUENTIAL) :持久顺序节点,创建后一直存在,直到主动删除此节点:-s

临时顺序节点(EPHEMERAL_SEQUENTIAL) :临时节点在客户端会话失效后节点自动清:-es

2.服务端命令

启动 ZooKeeper 服务: ./zkServer.sh start

查看 ZooKeeper 服务状态: ./zkServer.sh status

停止 ZooKeeper 服务: ./zkServer.sh stop

重启 ZooKeeper 服务: ./zkServer.sh restart

3.客户端常用命令

连接ZooKeeper服务端

./zkCli.sh -server localhost:2181

若为连接本机可直接

./zkCli.sh

显示指定目录下节点 

查看根节点
ls /

查看指定节点
ls /zookeeper

 

创建根节点

create /节点path value 

获取节点

create /节点path value 

设置节点

set /节点path value 

创建子节点

create /节点path/子节点 

删除单个节点

delete /节点path 

删除带有子节点的节点

deleteall /节点path 

查看命令帮助

help 

4.创建临时节点和顺序节点

创建临时节点

create -e /节点path value 

再次使用zk-cli连接服务端,验证刚才创建的临时节点已经没了

创建顺序节点,zk会自动在节点路径后边添加序号

create -s /节点path value

创建临时顺序节点

create -es /节点path value

czxid:节点被创建的事务ID

ctime: 创建时间

mZxid: 最后一次被更新的事务ID

mtime: 修改时间

pzxid:子节点列表最后一次被更新的事务ID

cversion:子节点的版本号

dataversion:数据版本号

aclversion:权限版本号

ephemeralOwner:用于临时节点,代表临时节点的事务ID,如果为持久节点则为0

dataLength:节点存储的数据的长度

numChildren:当前节点的子节点个数 

查询节点详细信息

ls –s /节点path 

2..ZooKeeper JavaAPI 操作 

1.Curator介绍

Curator 是 Apache ZooKeeper 的Java客户端库,目标是简化 ZooKeeper 客户端的使用

常见的ZooKeeper Java API :

原生Java API

ZkClient

Curator

Curator 最初是 Netfix 研发的,后来捐献了 Apache 基金会,目前是 Apache 的顶级项目

官网:Welcome to Apache Curator | Apache Curator

2.建立连接

创建项目curator-zk

pom.xml中添加Curator和日志坐标 

  1. <properties>
  2. <maven.compiler.source>8</maven.compiler.source>
  3. <maven.compiler.target>8</maven.compiler.target>
  4. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  5. </properties>
  6. <dependencies>
  7. <!--单元测试-->
  8. <dependency>
  9. <groupId>junit</groupId>
  10. <artifactId>junit</artifactId>
  11. <version>4.10</version>
  12. <scope>test</scope>
  13. </dependency>
  14. <!--curator-->
  15. <dependency>
  16. <groupId>org.apache.curator</groupId>
  17. <artifactId>curator-framework</artifactId>
  18. <version>4.0.0</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.apache.curator</groupId>
  22. <artifactId>curator-recipes</artifactId>
  23. <version>4.0.0</version>
  24. </dependency>
  25. <!--日志-->
  26. <dependency>
  27. <groupId>org.slf4j</groupId>
  28. <artifactId>slf4j-api</artifactId>
  29. <version>1.7.21</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.slf4j</groupId>
  33. <artifactId>slf4j-log4j12</artifactId>
  34. <version>1.7.21</version>
  35. </dependency>
  36. </dependencies>
  37. <build>
  38. <plugins>
  39. <plugin>
  40. <groupId>org.apache.maven.plugins</groupId>
  41. <artifactId>maven-compiler-plugin</artifactId>
  42. <version>3.1</version>
  43. <configuration>
  44. <source>1.8</source>
  45. <target>1.8</target>
  46. </configuration>
  47. </plugin>
  48. </plugins>
  49. </build>

日志配置文件:log4j.properties

  1. log4j.rootLogger=off,stdout
  2. log4j.appender.stdout = org.apache.log4j.ConsoleAppender
  3. log4j.appender.stdout.Target = System.out
  4. log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
  5. log4j.appender.stdout.layout.ConversionPattern = [%d{yyyy-MM-dd HH/:mm/:ss}]%-5p %c(line/:%L) %x-%m%n

在java测试包下创建com\curator包创建测试类CuratorTest

  1. package com.curator;
  2. import org.apache.curator.RetryPolicy;
  3. import org.apache.curator.framework.CuratorFramework;
  4. import org.apache.curator.framework.CuratorFrameworkFactory;
  5. import org.apache.curator.retry.ExponentialBackoffRetry;
  6. import org.junit.Test;
  7. public class CuratorTest {
  8. /*
  9. * 建立连接
  10. */
  11. @Test
  12. public void testConnect() {
  13. /*
  14. * connectString 连接字符串。zk server 地址和端口
  15. * sessionTimeoutMs 会话差事时间 单位ms
  16. * connectionTimeoutMs 连接超时时间 单位ms
  17. * retryPolicy 重试策略
  18. */
  19. //重试策略
  20. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
  21. //第一种方式
  22. /*CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.235.129:2181",
  23. 60*1000,15*1000,retryPolicy);*/
  24. //常用第二种方式,更直观
  25. CuratorFramework client = CuratorFrameworkFactory.builder()
  26. .connectString("192.168.235.129:2181")
  27. .sessionTimeoutMs(60 * 1000)
  28. .connectionTimeoutMs(15 * 1000)
  29. .retryPolicy(retryPolicy)
  30. .namespace("ljb")
  31. .build();
  32. //开启连接
  33. client.start();
  34. }
  35. }

3.创建节点

@Before:初始化方法(对于每一个测试方法都要执行一次)

@After:释放资源(对于每一个测试方法运行完后都要执行一次)

  1. package com.curator;
  2. import org.apache.curator.RetryPolicy;
  3. import org.apache.curator.framework.CuratorFramework;
  4. import org.apache.curator.framework.CuratorFrameworkFactory;
  5. import org.apache.curator.retry.ExponentialBackoffRetry;
  6. import org.apache.zookeeper.CreateMode;
  7. import org.junit.After;
  8. import org.junit.Before;
  9. import org.junit.Test;
  10. public class CuratorTest {
  11. /*
  12. * 建立连接
  13. */
  14. private CuratorFramework client;
  15. @Before
  16. public void testConnect() {
  17. /*
  18. * connectString 连接字符串。zk server 地址和端口
  19. * sessionTimeoutMs 会话差事时间 单位ms
  20. * connectionTimeoutMs 连接超时时间 单位ms
  21. * retryPolicy 重试策略
  22. */
  23. //重试策略
  24. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
  25. //第一种方式
  26. /*CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.235.129:2181",
  27. 60*1000,15*1000,retryPolicy);*/
  28. //常用第二种方式,更直观
  29. client = CuratorFrameworkFactory.builder()
  30. .connectString("192.168.235.129:2181")
  31. .sessionTimeoutMs(60 * 1000)
  32. .connectionTimeoutMs(15 * 1000)
  33. .retryPolicy(retryPolicy)
  34. .namespace("ljb")
  35. .build();
  36. //开启连接
  37. client.start();
  38. }
  39. /*
  40. * 创建节点:create 持久 临时 顺序 数据
  41. * 1.基本创建
  42. * 2.创建节点 带有数据
  43. * 3.设置节点的类型
  44. * 4.创建多级节点 /app1/p1
  45. */
  46. @Test
  47. public void testCreate() throws Exception {
  48. //1. 基本创建 :create().forPath("")
  49. //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
  50. String path = client.create().forPath("/app1");
  51. System.out.println(path);
  52. }
  53. @Test
  54. public void testCreate2() throws Exception {
  55. //2. 创建节点 带有数据:create().forPath("",data)
  56. //节点默认类型:持久化
  57. String path = client.create().forPath("/app2", "hehe".getBytes());
  58. System.out.println(path);
  59. }
  60. @Test
  61. public void testCreate3() throws Exception {
  62. //3. 设置节点的类型:create().withMode().forPath("",data)
  63. //创建临时节点
  64. String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
  65. System.out.println(path);
  66. }
  67. @Test
  68. public void testCreate4() throws Exception {
  69. //创建多级节点 /app1/p1 :create().creatingParentsIfNeeded().forPath("",data)
  70. //creatingParentsIfNeeded():如果父节点不存在,则创建父节点
  71. String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
  72. System.out.println(path);
  73. }
  74. @After
  75. public void close() {
  76. if (client != null) {
  77. client.close();
  78. }
  79. }
  80. }

4.查询节点

  1. //====================get===============================
  2. /**
  3. * 查询节点:
  4. * 1. 查询数据:get: getData().forPath()
  5. * 2. 查询子节点: ls: getChildren().forPath()
  6. * 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()
  7. */
  8. @Test
  9. public void testGet1() throws Exception {
  10. //1. 查询数据:get
  11. byte[] data = client.getData().forPath("/app1");
  12. System.out.println(new String(data));
  13. }
  14. @Test
  15. public void testGet2() throws Exception {
  16. // 2. 查询子节点: ls
  17. List<String> path = client.getChildren().forPath("/");
  18. System.out.println(path);
  19. }
  20. @Test
  21. public void testGet3() throws Exception {
  22. Stat status = new Stat();
  23. System.out.println(status);
  24. //3. 查询节点状态信息:ls -s
  25. // store status in ...
  26. client.getData().storingStatIn(status).forPath("/app1");
  27. System.out.println(status);
  28. }

5.修改节点 

  1. //=====================set==================
  2. /**
  3. * 修改数据
  4. * 1. 基本修改数据:setData().forPath()
  5. * 2. 根据版本修改: setData().withVersion().forPath()
  6. * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
  7. *
  8. * @throws Exception
  9. */
  10. @Test
  11. public void testSet() throws Exception {
  12. client.setData().forPath("/app1", "ljb".getBytes());
  13. }
  14. @Test
  15. public void testSetForVersion() throws Exception {
  16. Stat status = new Stat();
  17. //3. 查询节点状态信息:ls -s
  18. client.getData().storingStatIn(status).forPath("/app1");
  19. int version = status.getVersion();//查询出来的 3
  20. System.out.println(version);
  21. client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());
  22. }

6.删除结点

  1. //=================delete=================
  2. /**
  3. * 删除节点: delete deleteall
  4. * 1. 删除单个节点:delete().forPath("/app1");
  5. * 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
  6. * 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2");
  7. * 4. 回调:inBackground
  8. */
  9. @Test
  10. public void testDelete() throws Exception {
  11. // 1. 删除单个节点
  12. client.delete().forPath("/app1");
  13. }
  14. @Test
  15. public void testDelete2() throws Exception {
  16. //2. 删除带有子节点的节点
  17. client.delete().deletingChildrenIfNeeded().forPath("/app4");
  18. }
  19. @Test
  20. public void testDelete3() throws Exception {
  21. //3. 必须成功的删除, 底层是自动重试
  22. client.delete().guaranteed().forPath("/app2");
  23. }
  24. @Test
  25. public void testDelete4() throws Exception {
  26. //4. 回调
  27. client.delete().guaranteed().inBackground(new BackgroundCallback(){
  28. @Override
  29. public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
  30. System.out.println("我被删除了~");
  31. System.out.println(event);
  32. }
  33. }).forPath("/app1");
  34. }

完整代码 

  1. package com.curator;
  2. import org.apache.curator.RetryPolicy;
  3. import org.apache.curator.framework.CuratorFramework;
  4. import org.apache.curator.framework.CuratorFrameworkFactory;
  5. import org.apache.curator.framework.api.BackgroundCallback;
  6. import org.apache.curator.framework.api.CuratorEvent;
  7. import org.apache.curator.retry.ExponentialBackoffRetry;
  8. import org.apache.zookeeper.CreateMode;
  9. import org.apache.zookeeper.data.Stat;
  10. import org.junit.After;
  11. import org.junit.Before;
  12. import org.junit.Test;
  13. import java.util.List;
  14. public class CuratorTest {
  15. /*
  16. * 建立连接
  17. */
  18. private CuratorFramework client;
  19. @Before
  20. public void testConnect() {
  21. /*
  22. * connectString 连接字符串。zk server 地址和端口
  23. * sessionTimeoutMs 会话差事时间 单位ms
  24. * connectionTimeoutMs 连接超时时间 单位ms
  25. * retryPolicy 重试策略
  26. */
  27. //重试策略
  28. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
  29. //第一种方式
  30. /*CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.235.129:2181",
  31. 60*1000,15*1000,retryPolicy);*/
  32. //常用第二种方式,更直观
  33. client = CuratorFrameworkFactory.builder()
  34. .connectString("192.168.235.129:2181")
  35. .sessionTimeoutMs(60 * 1000)
  36. .connectionTimeoutMs(15 * 1000)
  37. .retryPolicy(retryPolicy)
  38. .namespace("ljb")
  39. .build();
  40. //开启连接
  41. client.start();
  42. }
  43. //======================create=====================
  44. /*
  45. * 创建节点:create 持久 临时 顺序 数据
  46. * 1.基本创建
  47. * 2.创建节点 带有数据
  48. * 3.设置节点的类型
  49. * 4.创建多级节点 /app1/p1
  50. */
  51. @Test
  52. public void testCreate() throws Exception {
  53. //1. 基本创建 :create().forPath("")
  54. //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
  55. String path = client.create().forPath("/app1");
  56. System.out.println(path);
  57. }
  58. @Test
  59. public void testCreate2() throws Exception {
  60. //2. 创建节点 带有数据:create().forPath("",data)
  61. //节点默认类型:持久化
  62. String path = client.create().forPath("/app2", "hehe".getBytes());
  63. System.out.println(path);
  64. }
  65. @Test
  66. public void testCreate3() throws Exception {
  67. //3. 设置节点的类型:create().withMode().forPath("",data)
  68. //创建临时节点
  69. String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
  70. System.out.println(path);
  71. }
  72. @Test
  73. public void testCreate4() throws Exception {
  74. //创建多级节点 /app1/p1 :create().creatingParentsIfNeeded().forPath("",data)
  75. //creatingParentsIfNeeded():如果父节点不存在,则创建父节点
  76. String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
  77. System.out.println(path);
  78. }
  79. //====================get===============================
  80. /**
  81. * 查询节点:
  82. * 1. 查询数据:get: getData().forPath()
  83. * 2. 查询子节点: ls: getChildren().forPath()
  84. * 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()
  85. */
  86. @Test
  87. public void testGet1() throws Exception {
  88. //1. 查询数据:get
  89. byte[] data = client.getData().forPath("/app1");
  90. System.out.println(new String(data));
  91. }
  92. @Test
  93. public void testGet2() throws Exception {
  94. // 2. 查询子节点: ls
  95. List<String> path = client.getChildren().forPath("/");
  96. System.out.println(path);
  97. }
  98. @Test
  99. public void testGet3() throws Exception {
  100. Stat status = new Stat();
  101. System.out.println(status);
  102. //3. 查询节点状态信息:ls -s
  103. // store status in ...
  104. client.getData().storingStatIn(status).forPath("/app1");
  105. System.out.println(status);
  106. }
  107. //=====================set==================
  108. /** 修改数据
  109. * 1. 基本修改数据:setData().forPath()
  110. * 2. 根据版本修改: setData().withVersion().forPath()
  111. * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
  112. */
  113. @Test
  114. public void testSet() throws Exception {
  115. client.setData().forPath("/app1", "ljb".getBytes());
  116. }
  117. @Test
  118. public void testSetForVersion() throws Exception {
  119. Stat status = new Stat();
  120. //3. 查询节点状态信息:ls -s
  121. client.getData().storingStatIn(status).forPath("/app1");
  122. int version = status.getVersion();//查询出来的 3
  123. System.out.println(version);
  124. client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());
  125. }
  126. //=================delete=================
  127. /**
  128. * 删除节点: delete deleteall
  129. * 1. 删除单个节点:delete().forPath("/app1");
  130. * 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
  131. * 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2");
  132. * 4. 回调:inBackground
  133. */
  134. @Test
  135. public void testDelete() throws Exception {
  136. // 1. 删除单个节点
  137. client.delete().forPath("/app1");
  138. }
  139. @Test
  140. public void testDelete2() throws Exception {
  141. //2. 删除带有子节点的节点
  142. client.delete().deletingChildrenIfNeeded().forPath("/app4");
  143. }
  144. @Test
  145. public void testDelete3() throws Exception {
  146. //3. 必须成功的删除, 底层是自动重试
  147. client.delete().guaranteed().forPath("/app2");
  148. }
  149. @Test
  150. public void testDelete4() throws Exception {
  151. //4. 回调
  152. client.delete().guaranteed().inBackground(new BackgroundCallback(){
  153. @Override
  154. public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
  155. System.out.println("我被删除了~");
  156. System.out.println(event);
  157. }
  158. }).forPath("/app1");
  159. }
  160. @After
  161. public void close() {
  162. if (client != null) {
  163. client.close();
  164. }
  165. }
  166. }

7.Watch监听概念 

ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性

ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者

ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便,需要开发人员自己反复注册Watcher,比较繁琐

Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听

ZooKeeper提供了三种Watcher:

NodeCache : 只监听某一个特定的节点

PathChildrenCache : 监控一个ZNode的子节点 

TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合

8.Watch监听-NodeCache

只监听某一个特定的节点

  1. package com.curator;
  2. import org.apache.curator.RetryPolicy;
  3. import org.apache.curator.framework.CuratorFramework;
  4. import org.apache.curator.framework.CuratorFrameworkFactory;
  5. import org.apache.curator.framework.recipes.cache.NodeCache;
  6. import org.apache.curator.framework.recipes.cache.NodeCacheListener;
  7. import org.apache.curator.retry.ExponentialBackoffRetry;
  8. import org.junit.After;
  9. import org.junit.Before;
  10. import org.junit.Test;
  11. public class CuratorWatcherTest {
  12. /*
  13. * 建立连接
  14. */
  15. private CuratorFramework client;
  16. @Before
  17. public void testConnect() {
  18. /*
  19. * connectString 连接字符串。zk server 地址和端口
  20. * sessionTimeoutMs 会话差事时间 单位ms
  21. * connectionTimeoutMs 连接超时时间 单位ms
  22. * retryPolicy 重试策略
  23. */
  24. //重试策略
  25. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
  26. //第一种方式
  27. /*CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.235.129:2181",
  28. 60*1000,15*1000,retryPolicy);*/
  29. //常用第二种方式,更直观
  30. client = CuratorFrameworkFactory.builder()
  31. .connectString("192.168.235.129:2181")
  32. .sessionTimeoutMs(60 * 1000)
  33. .connectionTimeoutMs(15 * 1000)
  34. .retryPolicy(retryPolicy)
  35. .namespace("ljb")
  36. .build();
  37. //开启连接
  38. client.start();
  39. }
  40. @After
  41. public void close() {
  42. if (client != null) {
  43. client.close();
  44. }
  45. }
  46. /**
  47. * 演示 NodeCache:给指定一个节点注册监听器
  48. */
  49. @Test
  50. public void testNodeCache() throws Exception {
  51. //1. 创建NodeCache对象
  52. NodeCache nodeCache = new NodeCache(client,"/app1");
  53. //2. 注册监听
  54. nodeCache.getListenable().addListener(new NodeCacheListener() {
  55. @Override
  56. public void nodeChanged() throws Exception {
  57. System.out.println("节点变化了~");
  58. //获取修改节点后的数据
  59. byte[] data = nodeCache.getCurrentData().getData();
  60. System.out.println(new String(data));
  61. }
  62. });
  63. //3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据
  64. nodeCache.start(true);
  65. while (true){
  66. }
  67. }
  68. }

9.Watch监听-PathChildrenCache

监控一个ZNode的子节点

  1. package com.curator;
  2. import org.apache.curator.RetryPolicy;
  3. import org.apache.curator.framework.CuratorFramework;
  4. import org.apache.curator.framework.CuratorFrameworkFactory;
  5. import org.apache.curator.framework.recipes.cache.*;
  6. import org.apache.curator.retry.ExponentialBackoffRetry;
  7. import org.junit.After;
  8. import org.junit.Before;
  9. import org.junit.Test;
  10. public class CuratorWatcherTest {
  11. /*
  12. * 建立连接
  13. */
  14. private CuratorFramework client;
  15. @Before
  16. public void testConnect() {
  17. /*
  18. * connectString 连接字符串。zk server 地址和端口
  19. * sessionTimeoutMs 会话差事时间 单位ms
  20. * connectionTimeoutMs 连接超时时间 单位ms
  21. * retryPolicy 重试策略
  22. */
  23. //重试策略
  24. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
  25. //第一种方式
  26. /*CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.235.129:2181",
  27. 60*1000,15*1000,retryPolicy);*/
  28. //常用第二种方式,更直观
  29. client = CuratorFrameworkFactory.builder()
  30. .connectString("192.168.235.129:2181")
  31. .sessionTimeoutMs(60 * 1000)
  32. .connectionTimeoutMs(15 * 1000)
  33. .retryPolicy(retryPolicy)
  34. .namespace("ljb")
  35. .build();
  36. //开启连接
  37. client.start();
  38. }
  39. @After
  40. public void close() {
  41. if (client != null) {
  42. client.close();
  43. }
  44. }
  45. @Test
  46. public void testPathChildrenCache() throws Exception {
  47. //1.创建监听对象
  48. PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
  49. //2. 绑定监听器
  50. pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override
  51. public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
  52. System.out.println("子节点变化了~");
  53. System.out.println(event);
  54. //监听子节点的数据变更,并且拿到变更后的数据
  55. //1.获取类型
  56. PathChildrenCacheEvent.Type type = event.getType();
  57. //2.判断类型是否是update
  58. if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
  59. System.out.println("数据变了!!!");
  60. byte[] data = event.getData().getData();
  61. System.out.println(new String(data));
  62. }
  63. }
  64. });
  65. //3. 开启
  66. pathChildrenCache.start();
  67. while (true){
  68. }
  69. }
  70. }

10.Watch监听-TreeCache 

可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache的组合

  1. package com.curator;
  2. import org.apache.curator.RetryPolicy;
  3. import org.apache.curator.framework.CuratorFramework;
  4. import org.apache.curator.framework.CuratorFrameworkFactory;
  5. import org.apache.curator.framework.recipes.cache.*;
  6. import org.apache.curator.retry.ExponentialBackoffRetry;
  7. import org.junit.After;
  8. import org.junit.Before;
  9. import org.junit.Test;
  10. public class CuratorWatcherTest {
  11. /*
  12. * 建立连接
  13. */
  14. private CuratorFramework client;
  15. @Before
  16. public void testConnect() {
  17. /*
  18. * connectString 连接字符串。zk server 地址和端口
  19. * sessionTimeoutMs 会话差事时间 单位ms
  20. * connectionTimeoutMs 连接超时时间 单位ms
  21. * retryPolicy 重试策略
  22. */
  23. //重试策略
  24. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
  25. //第一种方式
  26. /*CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.235.129:2181",
  27. 60*1000,15*1000,retryPolicy);*/
  28. //常用第二种方式,更直观
  29. client = CuratorFrameworkFactory.builder()
  30. .connectString("192.168.235.129:2181")
  31. .sessionTimeoutMs(60 * 1000)
  32. .connectionTimeoutMs(15 * 1000)
  33. .retryPolicy(retryPolicy)
  34. .namespace("ljb")
  35. .build();
  36. //开启连接
  37. client.start();
  38. }
  39. @After
  40. public void close() {
  41. if (client != null) {
  42. client.close();
  43. }
  44. }
  45. @Test
  46. public void testTreeCache() throws Exception {
  47. //1. 创建监听器
  48. TreeCache treeCache = new TreeCache(client,"/app2");
  49. //2. 注册监听
  50. treeCache.getListenable().addListener(new TreeCacheListener() {
  51. @Override
  52. public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
  53. System.out.println("节点变化了");
  54. System.out.println(event);
  55. }
  56. });
  57. //3. 开启
  58. treeCache.start();
  59. while (true){
  60. }
  61. }
  62. }

3.分布式锁

1.概述

在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized(同步)或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题

但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题

那么就需要一种更加高级的锁机制,来处理这种跨机器的进程之间的数据同步问题——这就是分布式锁

2.zookeeper分布式锁原理

核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点 

1.客户端获取锁时,在lock节点下创建临时顺序节点。

2.然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。

3.如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件。

4.如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的;如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听

为什么创建的是临时顺序节点: 如果是持久化结点,如果获取锁的节点宕机,锁就不会被释放,如果是临时的锁就会被自动释放。是顺序节点是因为要找最小的节点所以要排个顺序

3.模拟12306售票案例

在Curator中有五种锁方案:

InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)

InterProcessMutex:分布式可重入排它锁

InterProcessReadWriteLock:分布式读写锁

InterProcessMultiLock:将多个锁作为单个实体管理的容器

InterProcessSemaphoreV2:共享信号量

创建线程进行加锁设置

  1. package com.curator;
  2. import org.apache.curator.RetryPolicy;
  3. import org.apache.curator.framework.CuratorFramework;
  4. import org.apache.curator.framework.CuratorFrameworkFactory;
  5. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  6. import org.apache.curator.retry.ExponentialBackoffRetry;
  7. import java.util.concurrent.TimeUnit;
  8. public class Ticket12306 implements Runnable{
  9. private int tickets = 10;//数据库的票数
  10. private InterProcessMutex lock ;
  11. //在构造方法中创建连接,并且初始化锁
  12. public Ticket12306() {
  13. //重试策略
  14. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
  15. //2.第二种方式
  16. //CuratorFrameworkFactory.builder();
  17. CuratorFramework client = CuratorFrameworkFactory.builder()
  18. .connectString("192.168.235.129:2181")
  19. .sessionTimeoutMs(60 * 1000)
  20. .connectionTimeoutMs(15 * 1000)
  21. .retryPolicy(retryPolicy)
  22. .build();
  23. //开启连接
  24. client.start();
  25. try {
  26. if (client.checkExists().forPath("/lock") != null) {
  27. //删除之前测试遗留的/lock节点
  28. client.delete().deletingChildrenIfNeeded().forPath("/lock");
  29. }
  30. } catch (Exception e) {
  31. e.printStackTrace();
  32. }
  33. lock = new InterProcessMutex(client, "/lock");
  34. }
  35. @Override
  36. public void run() {
  37. while(true){
  38. //获取锁
  39. try {
  40. lock.acquire(3, TimeUnit.SECONDS);//时间与时间单位
  41. if(tickets > 0){
  42. System.out.println(Thread.currentThread()+":"+tickets);
  43. Thread.sleep(100);
  44. tickets--;
  45. }
  46. } catch (Exception e) {
  47. e.printStackTrace();
  48. }finally {
  49. //释放锁
  50. try {
  51. lock.release();
  52. } catch (Exception e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. }
  57. }
  58. }

测试

  1. package com.curator;
  2. public class LockTest {
  3. public static void main(String[] args) {
  4. Ticket12306 ticket12306 = new Ticket12306();
  5. //创建客户端
  6. Thread t1 = new Thread(ticket12306,"携程");
  7. Thread t2 = new Thread(ticket12306,"飞猪");
  8. t1.start();
  9. t2.start();
  10. }
  11. }

 

4.Zookeeper 集群搭建 

1.Zookeeper集群概述

Leader选举:

Serverid:服务器ID

比如有三台服务器,编号分别是1,2,3

编号越大在选择算法中的权重越大

Zxid:数据ID

服务器中存放的最大数据ID,值越大说明数据越新,在选举算法中数据越新权重越大

在Leader选举的过程中,如果某台ZooKeeper获得了超过半数的选票,则此ZooKeeper就可以成为Leader了

2.集群搭建

搭建要求

真实的集群是需要部署在不同的服务器上的,但是在我们测试时同时启动很多个虚拟机内存会吃不消,所以我们通常会搭建伪集群,也就是把所有的服务都搭建在一台虚拟机上,用端口进行区分。

我们这里要求搭建一个三个节点的Zookeeper集群(伪集群)

3.搭建准备

建立/usr/local/zookeeper-cluster目录,将解压后的Zookeeper复制到以下三个目录

mkdir /usr/local/zookeeper-cluster

cp -r  apache-zookeeper-3.8.3-bin /usr/local/zookeeper-cluster/zookeeper-1

cp -r  apache-zookeeper-3.8.3-bin /usr/local/zookeeper-cluster/zookeeper-2

cp -r  apache-zookeeper-3.8.3-bin /usr/local/zookeeper-cluster/zookeeper-3

创建data目录 ,并且将 conf下zoo_sample.cfg 文件改名为 zoo.cfg

mkdir /usr/local/zookeeper-cluster/zookeeper-1/data
mkdir /usr/local/zookeeper-cluster/zookeeper-2/data
mkdir /usr/local/zookeeper-cluster/zookeeper-3/data

mv  /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo_sample.cfg  /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg

mv  /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo_sample.cfg  /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg

mv  /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo_sample.cfg  /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg

配置每一个Zookeeper 的dataDir 和 clientPort 分别为2181 2182 2183

vim /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
dataDir=/usr/local/zookeeper-cluster/zookeeper-1/data

clientPort=2181

vim /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg

clientPort=2182
dataDir=/usr/local/zookeeper-cluster/zookeeper-2/data

vim /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg

clientPort=2183
dataDir=/usr/local/zookeeper-cluster/zookeeper-3/data

4.配置集群

在每个zookeeper的 data 目录下创建一个 myid 文件,内容分别是1、2、3 。这个文件就是记录每个服务器的ID

echo 1 >/usr/local/zookeeper-cluster/zookeeper-1/data/myid
echo 2 >/usr/local/zookeeper-cluster/zookeeper-2/data/myid
echo 3 >/usr/local/zookeeper-cluster/zookeeper-3/data/myid
 

在每一个zookeeper 的 zoo.cfg配置客户端访问端口(clientPort)和集群服务器IP列表

vim /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
vim /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
vim /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg

server.1=192.168.235.129:2881:3881
server.2=192.168.235.129:2882:3882
server.3=192.168.235.129:2883:3883

注:server.服务器ID=服务器IP地址:服务器之间通信端口:服务器之间投票选举端口

5.启动集群

启动集群就是分别启动每个实例

/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start

启动后查询一下每个实例的运行状态

/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status

先查询第一个服务:Mode为follower表示是跟随者(从)

再查询第二个服务:Mod 为leader表示是领导者(主)

查询第三个为跟随者(从)

6.模拟集群异常 

首先我们先测试如果是从服务器挂掉,会怎么样

把3号服务器停掉,观察1号和2号,发现状态并没有变化

/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh stop

/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status

由此得出结论,3个节点的集群,从服务器挂掉,集群正常

再把1号服务器(从服务器)也停掉,查看2号(主服务器)的状态,发现已经停止运行了

/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh stop

/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status

3个节点的集群,2个从服务器都挂掉,主服务器也无法运行。因为可运行的机器没有超过集群总数量的半数

我们再次把1号服务器启动起来,发现2号服务器又开始正常工作了。而且依然是领导者

/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh start

/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status

我们把3号服务器也启动起来,把2号服务器停掉,停掉后观察1号和3号的状态

/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh stop

/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status

发现新的leader产生了

当集群中的主服务器挂了,集群中的其他服务器会自动进行选举状态,然后产生新得leader

我们再次测试,当我们把2号服务器重新启动起来启动后,会发生什么?2号服务器会再次成为新的领导吗

/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh start

/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status

我们会发现,2号服务器启动后依然是跟随者(从服务器),3号服务器依然是领导者(主服务器),没有撼动3号服务器的领导地位

当领导者产生后,再次有新服务器加入集群,不会影响到现任领导者

7.Zookeeper 集群核心理论

在ZooKeeper集群服务中有三个角色:

Leader 领导者 :

1.处理事务请求(增删改操作)

2.集群内部各服务器的调度者

Follower 跟随者 :

1.处理客户端非事务请求(查询操作),转发事务请求给Leader服务器

2.参与Leader选举投票

Observer 观察者:

处理客户端非事务请求(查询操作),转发事务请求给Leader服务器

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/440039
推荐阅读
相关标签
  

闽ICP备14008679号