当前位置:   article > 正文

Zookeeper学习一

Zookeeper学习一

初识 Zookeeper

Zookeeper 是 Apache Hadoop 项目下的一个子项目,是一个树形目录服务(B树)

Zookeeper 翻译过来就是 动物园管理员,他是用来管 Hadoop(大象)、Hive(蜜蜂)、Pig(小 猪)的管理员。简称zk

Zookeeper 是一个分布式的、开源的分布式应用程序的协调服务

Zookeeper 提供的主要功能包括: 配置管理 分布式锁 集群管理

Zookeeper 安装与配置

1.1 下载安装

1、环境准备

ZooKeeper服务器是用Java创建的,它运行在JVM之上。需要安装JDK 7或更高版本。

2、上传

将下载的ZooKeeper放到/opt/ZooKeeper目录下

  1. #上传zookeeper alt+p
  2. put f:/setup/apache-zookeeper-3.5.6-bin.tar.gz
  3. #打开 opt目录
  4. cd /opt
  5. #创建zooKeeper目录
  6. mkdir zooKeeper
  7. #将zookeeper安装包移动到 /opt/zooKeeper
  8. mv apache-zookeeper-3.5.6-bin.tar.gz /opt/zookeeper/

3、解压

将tar包解压到/opt/zookeeper目录下

tar -zxvf apache-ZooKeeper-3.5.6-bin.tar.gz 

1.2 配置启动

1、配置zoo.cfg

进入到conf目录拷贝一个zoo_sample.cfg并完成配置

  1. #进入到conf目录
  2. cd /opt/zooKeeper/apache-zooKeeper-3.5.6-bin/conf/
  3. #拷贝
  4. cp zoo_sample.cfg zoo.cfg

修改zoo.cfg

  1. #打开目录
  2. cd /opt/zooKeeper/
  3. #创建zooKeeper存储目录
  4. mkdir zkdata
  5. #修改zoo.cfg
  6. vim /opt/zooKeeper/apache-zooKeeper-3.5.6-bin/conf/zoo.cfg

修改存储目录:dataDir=/opt/zookeeper/zkdata

2、启动ZooKeeper

  1. cd /opt/zooKeeper/apache-zooKeeper-3.5.6-bin/bin/
  2. #启动
  3. ./zkServer.sh  start

看到上图表示ZooKeeper成功启动

3、查看ZooKeeper状态

./zkServer.sh status

zookeeper启动成功。standalone代表zk没有搭建集群,现在是单节点

zookeeper没有启动

Zookeeper 命令操作

Zookeeper 数据模型

ZooKeeper 是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构。

这里面的每一个节点都被称为: ZNode,每个节点上都会保存自己的数据和节点信息。

 节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。

节点可以分为四大类:

  • PERSISTENT 持久化节点
  • EPHEMERAL 临时节点 :-e
  • PERSISTENT_SEQUENTIAL 持久化顺序节点 :-s
  • EPHEMERAL_SEQUENTIAL 临时顺序节点  :-es

Zookeeper服务端常用命令

  • 启动 ZooKeeper 服务: ./zkServer.sh start
  • 查看 ZooKeeper 服务状态: ./zkServer.sh status
  • 停止 ZooKeeper 服务: ./zkServer.sh stop
  • 重启 ZooKeeper 服务: ./zkServer.sh restart

Zookeeper客户端常见命令

连接ZooKeeper服务器:./zkCli.sh –server ip:port

断开连接:quit

设置节点值:set /节点path value

查看命令帮助:help

删除单个节点:delete /节点path

显示指定目录下节点:ls 目录

删除带有子节点的节点:deleteall /节点path

创建节点:create /节点path value

获取节点值:get /节点path

Zookeeper JavaAPI 操作

建立连接

建立连接有两种方式,一种是调用工厂对象的newClient()方法,另一种就是调用工厂对象的builder(),通过链式调用的方法就连接信息传入工厂中。

下面是代码示例:

  1. /**
  2. * 建立连接
  3. */
  4. @Before
  5. public void testConnect() {
  6. /*
  7. *
  8. * @param connectString 连接字符串。zk server 地址和端口 "192.168.149.135:2181,192.168.149.136:2181"
  9. * @param sessionTimeoutMs 会话超时时间 单位ms
  10. * @param connectionTimeoutMs 连接超时时间 单位ms
  11. * @param retryPolicy 重试策略
  12. */
  13. /* //重试策略
  14. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
  15. //1.第一种方式
  16. CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",
  17. 60 * 1000, 15 * 1000, retryPolicy);*/
  18. //重试策略
  19. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
  20. //2.第二种方式
  21. //CuratorFrameworkFactory.builder();
  22. client = CuratorFrameworkFactory.builder()
  23. .connectString("192.168.149.135:2181")
  24. .sessionTimeoutMs(60 * 1000)
  25. .connectionTimeoutMs(15 * 1000)
  26. .retryPolicy(retryPolicy)
  27. //命名空间,使创建的节点都在命名空间的路径下面
  28. .namespace("kjz")
  29. .build();
  30. //开启连接
  31. client.start();
  32. }

注意此方法需要加上@Before注解,表示其他测试方法执行前需要先执行加了@Before注解的方法。因为每次进行crud操作时都需要与ZooKeeper Server建立连接。

建立了连接,操作完毕后同时需要释放连接,在对应方法上面加一个@After注解,表示每次进行测试最后都要执行该方法。

  1. @After
  2. public void close() {
  3. if (client != null) {
  4. client.close();
  5. }
  6. }

添加节点

代码示例如下:

  1. /**
  2. * 创建节点:create 持久 临时 顺序 数据
  3. * 1. 基本创建 :create().forPath("")
  4. * 2. 创建节点 带有数据:create().forPath("",data)
  5. * 3. 设置节点的类型:create().withMode().forPath("",data)
  6. * 4. 创建多级节点 /app1/p1 :create().creatingParentsIfNeeded().forPath("",data)
  7. */
  8. @Test
  9. public void testCreate() throws Exception {
  10. //2. 创建节点 带有数据
  11. //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
  12. String path = client.create().forPath("/app2", "hehe".getBytes());
  13. System.out.println(path);
  14. }
  15. @Test
  16. public void testCreate2() throws Exception {
  17. //1. 基本创建
  18. //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
  19. String path = client.create().forPath("/app1");
  20. System.out.println(path);
  21. }
  22. @Test
  23. public void testCreate3() throws Exception {
  24. //3. 设置节点的类型
  25. //默认类型:持久化
  26. String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
  27. System.out.println(path);
  28. }
  29. @Test
  30. public void testCreate4() throws Exception {
  31. //4. 创建多级节点 /app1/p1
  32. //creatingParentsIfNeeded():如果父节点不存在,则创建父节点
  33. String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
  34. System.out.println(path);
  35. }

Curator中提供了一个枚举类,里面定义了设置不同类型节点的常量

运行testCreate3()时我发现ZooKeeper中并没有保存我创建的节点,原因是在创建节点时指定的节点类型为临时节点,临时节点在会话结束后就会删除。我通过运行testCreate3()这个方法来创建一个节点,运行结束,说明会话也结束了,ZooKeeper就会把节点删除了。

删除节点

删除节点: delete deleteall

删除单个节点:delete().forPath("/app1");

删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");

必须成功的删除:为了防止网络抖动。本质就是重试。client.delete().guaranteed().forPath("/app2");

回调:inBackground;

代码示例:

  1. /**
  2. * 删除节点: delete deleteall
  3. * 1. 删除单个节点:delete().forPath("/app1");
  4. * 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
  5. * 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2");
  6. * 4. 回调:inBackground
  7. * @throws Exception
  8. */
  9. @Test
  10. public void testDelete() throws Exception {
  11. // 1. 删除单个节点
  12. client.delete().forPath("/app1");
  13. }
  14. @Test
  15. public void testDelete2() throws Exception {
  16. //2. 删除带有子节点的节点
  17. client.delete().deletingChildrenIfNeeded().forPath("/app4");
  18. }
  19. @Test
  20. public void testDelete3() throws Exception {
  21. //3. 必须成功的删除
  22. client.delete().guaranteed().forPath("/app2");
  23. }
  24. @Test
  25. public void testDelete4() throws Exception {
  26. //4. 回调
  27. client.delete().guaranteed().inBackground(new BackgroundCallback(){
  28. @Override
  29. public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
  30. System.out.println("我被删除了~");
  31. System.out.println(event);
  32. }
  33. }).forPath("/app1");
  34. }

修改节点

基本修改数据 setData().forPath()

根据版本修改数据 setData().withVersion().forPath()

version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。

代码示例:

  1. /**
  2. * 修改数据
  3. * 1. 基本修改数据:setData().forPath()
  4. * 2. 根据版本修改: setData().withVersion().forPath()
  5. * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
  6. *
  7. * @throws Exception
  8. */
  9. @Test
  10. public void testSet() throws Exception {
  11. client.setData().forPath("/app1", "itcast".getBytes());
  12. }
  13. @Test
  14. public void testSetForVersion() throws Exception {
  15. Stat status = new Stat();
  16. //3. 查询节点状态信息:ls -s
  17. client.getData().storingStatIn(status).forPath("/app1");
  18. int version = status.getVersion();//查询出来的 3
  19. System.out.println(version);
  20. client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());
  21. }

查询节点

查询数据:get: getData().forPath()

查询子节点: ls: getChildren().forPath()

查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath() 

代码示例:

  1. @Test
  2. public void testGet1() throws Exception {
  3. //1. 查询数据:get
  4. byte[] data = client.getData().forPath("/app1");
  5. System.out.println(new String(data));
  6. }
  7. @Test
  8. public void testGet2() throws Exception {
  9. // 2. 查询子节点: ls
  10. List<String> path = client.getChildren().forPath("/");
  11. System.out.println(path);
  12. }
  13. @Test
  14. public void testGet3() throws Exception {
  15. Stat status = new Stat();
  16. System.out.println(status);
  17. //3. 查询节点状态信息:ls -s
  18. client.getData().storingStatIn(status).forPath("/app1");
  19. System.out.println(status);
  20. }

Stat里面封装了节点的状态信息

完整代码如下:

  1. package com.kjz.curator;
  2. import org.apache.curator.RetryPolicy;
  3. import org.apache.curator.framework.CuratorFramework;
  4. import org.apache.curator.framework.CuratorFrameworkFactory;
  5. import org.apache.curator.framework.api.BackgroundCallback;
  6. import org.apache.curator.framework.api.CuratorEvent;
  7. import org.apache.curator.retry.ExponentialBackoffRetry;
  8. import org.apache.zookeeper.CreateMode;
  9. import org.apache.zookeeper.data.Stat;
  10. import org.junit.After;
  11. import org.junit.Before;
  12. import org.junit.Test;
  13. import java.util.List;
  14. public class CuratorTest {
  15. private CuratorFramework client;
  16. /**
  17. * 建立连接
  18. */
  19. @Before
  20. public void testConnect() {
  21. /*
  22. *
  23. * @param connectString 连接字符串。zk server 地址和端口 "192.168.149.135:2181,192.168.149.136:2181"
  24. * @param sessionTimeoutMs 会话超时时间 单位ms
  25. * @param connectionTimeoutMs 连接超时时间 单位ms
  26. * @param retryPolicy 重试策略
  27. */
  28. /* //重试策略
  29. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
  30. //1.第一种方式
  31. CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",
  32. 60 * 1000, 15 * 1000, retryPolicy);*/
  33. //重试策略
  34. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
  35. //2.第二种方式
  36. //CuratorFrameworkFactory.builder();
  37. client = CuratorFrameworkFactory.builder()
  38. .connectString("192.168.149.135:2181")
  39. .sessionTimeoutMs(60 * 1000)
  40. .connectionTimeoutMs(15 * 1000)
  41. .retryPolicy(retryPolicy)
  42. .namespace("kjz")
  43. .build();
  44. //开启连接
  45. client.start();
  46. }
  47. //==============================create=============================================================================
  48. /**
  49. * 创建节点:create 持久 临时 顺序 数据
  50. * 1. 基本创建 :create().forPath("")
  51. * 2. 创建节点 带有数据:create().forPath("",data)
  52. * 3. 设置节点的类型:create().withMode().forPath("",data)
  53. * 4. 创建多级节点 /app1/p1 :create().creatingParentsIfNeeded().forPath("",data)
  54. */
  55. @Test
  56. public void testCreate() throws Exception {
  57. //2. 创建节点 带有数据
  58. //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
  59. String path = client.create().forPath("/app2", "hehe".getBytes());
  60. System.out.println(path);
  61. }
  62. @Test
  63. public void testCreate2() throws Exception {
  64. //1. 基本创建
  65. //如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
  66. String path = client.create().forPath("/app1");
  67. System.out.println(path);
  68. }
  69. @Test
  70. public void testCreate3() throws Exception {
  71. //3. 设置节点的类型
  72. //默认类型:持久化
  73. String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
  74. System.out.println(path);
  75. }
  76. @Test
  77. public void testCreate4() throws Exception {
  78. //4. 创建多级节点 /app1/p1
  79. //creatingParentsIfNeeded():如果父节点不存在,则创建父节点
  80. String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
  81. System.out.println(path);
  82. }
  83. //===========================get================================================================================
  84. /**
  85. * 查询节点:
  86. * 1. 查询数据:get: getData().forPath()
  87. * 2. 查询子节点: ls: getChildren().forPath()
  88. * 3. 查询节点状态信息:ls -s:getData().storingStatIn(状态对象).forPath()
  89. */
  90. @Test
  91. public void testGet1() throws Exception {
  92. //1. 查询数据:get
  93. byte[] data = client.getData().forPath("/app1");
  94. System.out.println(new String(data));
  95. }
  96. @Test
  97. public void testGet2() throws Exception {
  98. // 2. 查询子节点: ls
  99. List<String> path = client.getChildren().forPath("/");
  100. System.out.println(path);
  101. }
  102. @Test
  103. public void testGet3() throws Exception {
  104. Stat status = new Stat();
  105. System.out.println(status);
  106. //3. 查询节点状态信息:ls -s
  107. client.getData().storingStatIn(status).forPath("/app1");
  108. System.out.println(status);
  109. }
  110. //===========================set================================================================================
  111. /**
  112. * 修改数据
  113. * 1. 基本修改数据:setData().forPath()
  114. * 2. 根据版本修改: setData().withVersion().forPath()
  115. * * version 是通过查询出来的。目的就是为了让其他客户端或者线程不干扰我。
  116. *
  117. * @throws Exception
  118. */
  119. @Test
  120. public void testSet() throws Exception {
  121. client.setData().forPath("/app1", "kjz".getBytes());
  122. }
  123. @Test
  124. public void testSetForVersion() throws Exception {
  125. Stat status = new Stat();
  126. //3. 查询节点状态信息:ls -s
  127. client.getData().storingStatIn(status).forPath("/app1");
  128. int version = status.getVersion();//查询出来的 3
  129. System.out.println(version);
  130. client.setData().withVersion(version).forPath("/app1", "hehe".getBytes());
  131. }
  132. //===========================delete================================================================================
  133. /**
  134. * 删除节点: delete deleteall
  135. * 1. 删除单个节点:delete().forPath("/app1");
  136. * 2. 删除带有子节点的节点:delete().deletingChildrenIfNeeded().forPath("/app1");
  137. * 3. 必须成功的删除:为了防止网络抖动。本质就是重试。 client.delete().guaranteed().forPath("/app2");
  138. * 4. 回调:inBackground
  139. * @throws Exception
  140. */
  141. @Test
  142. public void testDelete() throws Exception {
  143. // 1. 删除单个节点
  144. client.delete().forPath("/app1");
  145. }
  146. @Test
  147. public void testDelete2() throws Exception {
  148. //2. 删除带有子节点的节点
  149. client.delete().deletingChildrenIfNeeded().forPath("/app4");
  150. }
  151. @Test
  152. public void testDelete3() throws Exception {
  153. //3. 必须成功的删除
  154. client.delete().guaranteed().forPath("/app2");
  155. }
  156. @Test
  157. public void testDelete4() throws Exception {
  158. //4. 回调
  159. client.delete().guaranteed().inBackground(new BackgroundCallback(){
  160. @Override
  161. public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
  162. System.out.println("我被删除了~");
  163. System.out.println(event);
  164. }
  165. }).forPath("/app1");
  166. }
  167. @After
  168. public void close() {
  169. if (client != null) {
  170. client.close();
  171. }
  172. }
  173. }

Watch事件监听

  • ZooKeeper 允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,ZooKeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 ZooKeeper 实现分布式协调服务的重要特性。
  • ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
  • ZooKeeper 原生支持通过注册Watcher来进行事件监听,但是其使用并不是特别方便     需要开发人员自己反复注册Watcher,比较繁琐。
  • Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。

ZooKeeper提供了三种Watcher:

  •         NodeCache : 只是监听某一个特定的节点
  •          PathChildrenCache : 监控一个ZNode的子节点.
  •          TreeCache : 可以监控整个树上的所有节点,类似于PathChildrenCache和NodeCache             的组合

演示 NodeCache:给指定一个节点注册监听器

  1. /**
  2. * 演示 NodeCache:给指定一个节点注册监听器
  3. */
  4. @Test
  5. public void testNodeCache() throws Exception {
  6. //1. 创建NodeCache对象
  7. final NodeCache nodeCache = new NodeCache(client,"/app1");
  8. //2. 注册监听
  9. nodeCache.getListenable().addListener(new NodeCacheListener() {
  10. @Override
  11. public void nodeChanged() throws Exception {
  12. System.out.println("节点变化了~");
  13. //获取修改节点后的数据
  14. byte[] data = nodeCache.getCurrentData().getData();
  15. System.out.println(new String(data));
  16. }
  17. });
  18. //3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据
  19. nodeCache.start(true);
  20. while (true){
  21. }
  22. }

演示 PathChildrenCache:监听某个节点的所有子节点们

  1. /**
  2. * 演示 PathChildrenCache:监听某个节点的所有子节点们
  3. */
  4. @Test
  5. public void testPathChildrenCache() throws Exception {
  6. //1.创建监听对象
  7. PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
  8. //2. 绑定监听器
  9. pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
  10. @Override
  11. public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
  12. System.out.println("子节点变化了~");
  13. System.out.println(event);
  14. //监听子节点的数据变更,并且拿到变更后的数据
  15. //1.获取类型
  16. PathChildrenCacheEvent.Type type = event.getType();
  17. //2.判断类型是否是update
  18. if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
  19. System.out.println("数据变了!!!");
  20. byte[] data = event.getData().getData();
  21. System.out.println(new String(data));
  22. }
  23. }
  24. });
  25. //3. 开启
  26. pathChildrenCache.start();
  27. while (true){
  28. }
  29. }

数据变更类型的枚举类。

演示 TreeCache:监听某个节点自己和所有子节点们,相当于PathChildrenCache和NodeCache的组合

  1. /**
  2. * 演示 NodeCache:给指定一个节点注册监听器
  3. */
  4. @Test
  5. public void testNodeCache() throws Exception {
  6. //1. 创建NodeCache对象
  7. final NodeCache nodeCache = new NodeCache(client,"/app1");
  8. //2. 注册监听
  9. nodeCache.getListenable().addListener(new NodeCacheListener() {
  10. @Override
  11. public void nodeChanged() throws Exception {
  12. System.out.println("节点变化了~");
  13. //获取修改节点后的数据
  14. byte[] data = nodeCache.getCurrentData().getData();
  15. System.out.println(new String(data));
  16. }
  17. });
  18. //3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据
  19. nodeCache.start(true);
  20. while (true){
  21. }
  22. }

完整代码如下:

  1. package com.kjz.curator;
  2. import org.apache.curator.RetryPolicy;
  3. import org.apache.curator.framework.CuratorFramework;
  4. import org.apache.curator.framework.CuratorFrameworkFactory;
  5. import org.apache.curator.framework.api.BackgroundCallback;
  6. import org.apache.curator.framework.api.CuratorEvent;
  7. import org.apache.curator.framework.recipes.cache.*;
  8. import org.apache.curator.retry.ExponentialBackoffRetry;
  9. import org.apache.zookeeper.CreateMode;
  10. import org.apache.zookeeper.data.Stat;
  11. import org.junit.After;
  12. import org.junit.Before;
  13. import org.junit.Test;
  14. import java.util.List;
  15. public class CuratorWatcherTest {
  16. private CuratorFramework client;
  17. /**
  18. * 建立连接
  19. */
  20. @Before
  21. public void testConnect() {
  22. /*
  23. *
  24. * @param connectString 连接字符串。zk server 地址和端口 "192.168.149.135:2181,192.168.149.136:2181"
  25. * @param sessionTimeoutMs 会话超时时间 单位ms
  26. * @param connectionTimeoutMs 连接超时时间 单位ms
  27. * @param retryPolicy 重试策略
  28. */
  29. /* //重试策略
  30. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
  31. //1.第一种方式
  32. CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.149.135:2181",
  33. 60 * 1000, 15 * 1000, retryPolicy);*/
  34. //重试策略
  35. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
  36. //2.第二种方式
  37. //CuratorFrameworkFactory.builder();
  38. client = CuratorFrameworkFactory.builder()
  39. .connectString("192.168.149.135:2181")
  40. .sessionTimeoutMs(60 * 1000)
  41. .connectionTimeoutMs(15 * 1000)
  42. .retryPolicy(retryPolicy)
  43. .namespace("kjz")
  44. .build();
  45. //开启连接
  46. client.start();
  47. }
  48. @After
  49. public void close() {
  50. if (client != null) {
  51. client.close();
  52. }
  53. }
  54. /**
  55. * 演示 NodeCache:给指定一个节点注册监听器
  56. */
  57. @Test
  58. public void testNodeCache() throws Exception {
  59. //1. 创建NodeCache对象
  60. final NodeCache nodeCache = new NodeCache(client,"/app1");
  61. //2. 注册监听
  62. nodeCache.getListenable().addListener(new NodeCacheListener() {
  63. @Override
  64. public void nodeChanged() throws Exception {
  65. System.out.println("节点变化了~");
  66. //获取修改节点后的数据
  67. byte[] data = nodeCache.getCurrentData().getData();
  68. System.out.println(new String(data));
  69. }
  70. });
  71. //3. 开启监听.如果设置为true,则开启监听是,加载缓冲数据
  72. nodeCache.start(true);
  73. while (true){
  74. }
  75. }
  76. /**
  77. * 演示 PathChildrenCache:监听某个节点的所有子节点们
  78. */
  79. @Test
  80. public void testPathChildrenCache() throws Exception {
  81. //1.创建监听对象
  82. PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
  83. //2. 绑定监听器
  84. pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
  85. @Override
  86. public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
  87. System.out.println("子节点变化了~");
  88. System.out.println(event);
  89. //监听子节点的数据变更,并且拿到变更后的数据
  90. //1.获取类型
  91. PathChildrenCacheEvent.Type type = event.getType();
  92. //2.判断类型是否是update
  93. if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
  94. System.out.println("数据变了!!!");
  95. byte[] data = event.getData().getData();
  96. System.out.println(new String(data));
  97. }
  98. }
  99. });
  100. //3. 开启
  101. pathChildrenCache.start();
  102. while (true){
  103. }
  104. }
  105. /**
  106. * 演示 TreeCache:监听某个节点自己和所有子节点们
  107. */
  108. @Test
  109. public void testTreeCache() throws Exception {
  110. //1. 创建监听器
  111. TreeCache treeCache = new TreeCache(client,"/app2");
  112. //2. 注册监听
  113. treeCache.getListenable().addListener(new TreeCacheListener() {
  114. @Override
  115. public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
  116. System.out.println("节点变化了");
  117. System.out.println(event);
  118. }
  119. });
  120. //3. 开启
  121. treeCache.start();
  122. while (true){
  123. }
  124. }
  125. }

分布式锁实现

在我们进行单机应用开发,涉及并发同步的时候,我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题,这时多线程的运行都是在同一个JVM之下,没有任何问题。 但当我们的应用是分布式集群工作的情况下,属于多JVM下的工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题。 那么就需要一种更加高级的锁机制,来处理种 跨机器的进程之间的数据同步问题——这就是分布式锁。
分布式锁常见的实现方式:
下面介绍ZooKeeper的实现方式:

Zookeeper分布式锁原理

核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点。

  1. 客户端获取锁时,在lock节点下创建临时顺序节点。临时:防止获取到锁的服务宕机了导致锁无法释放,临时节点会在会话结束后自动删除。顺序:找到最小节点,创建最小节点的服务获取到锁。
  2. 然后获取lock下面的所有子节点,客户端获取到所有的子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。使用完锁后,将该节点删除。
  3. 如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件
  4. 如果发现比自己小的那个节点被删除,则客户端的 Watcher会收到相应通知,此时再次判断自己创建的节点     是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点     并注册监听。

Curator实现分布式锁API

在Curator中有五种锁方案:

  • InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
  • InterProcessMutex:分布式可重入排它锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器
  • InterProcessSemaphoreV2:共享信号量

案例:模拟12306售票

模拟12306服务

  1. package com.kjz.curator;
  2. import org.apache.curator.RetryPolicy;
  3. import org.apache.curator.framework.CuratorFramework;
  4. import org.apache.curator.framework.CuratorFrameworkFactory;
  5. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  6. import org.apache.curator.retry.ExponentialBackoffRetry;
  7. import java.util.concurrent.TimeUnit;
  8. public class Ticket12306 implements Runnable{
  9. private int tickets = 10;//数据库的票数
  10. private InterProcessMutex lock ;
  11. public Ticket12306(){
  12. //重试策略
  13. RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
  14. //2.第二种方式
  15. //CuratorFrameworkFactory.builder();
  16. CuratorFramework client = CuratorFrameworkFactory.builder()
  17. .connectString("192.168.149.135:2181")
  18. .sessionTimeoutMs(60 * 1000)
  19. .connectionTimeoutMs(15 * 1000)
  20. .retryPolicy(retryPolicy)
  21. .build();
  22. //开启连接
  23. client.start();
  24. lock = new InterProcessMutex(client,"/lock");
  25. }
  26. @Override
  27. public void run() {
  28. while(true){
  29. //获取锁
  30. try {
  31. lock.acquire(3, TimeUnit.SECONDS);
  32. if(tickets > 0){
  33. System.out.println(Thread.currentThread()+":"+tickets);
  34. Thread.sleep(100);
  35. tickets--;
  36. }
  37. } catch (Exception e) {
  38. e.printStackTrace();
  39. }finally {
  40. //释放锁
  41. try {
  42. lock.release();
  43. } catch (Exception e) {
  44. e.printStackTrace();
  45. }
  46. }
  47. }
  48. }
  49. }

模拟买票

  1. package com.kjz.curator;
  2. import org.apache.curator.RetryPolicy;
  3. import org.apache.curator.framework.CuratorFramework;
  4. import org.apache.curator.framework.CuratorFrameworkFactory;
  5. import org.apache.curator.framework.recipes.cache.*;
  6. import org.apache.curator.retry.ExponentialBackoffRetry;
  7. import org.junit.After;
  8. import org.junit.Before;
  9. import org.junit.Test;
  10. public class LockTest {
  11. public static void main(String[] args) {
  12. Ticket12306 ticket12306 = new Ticket12306();
  13. //创建客户端
  14. Thread t1 = new Thread(ticket12306,"携程");
  15. Thread t2 = new Thread(ticket12306,"飞猪");
  16. t1.start();
  17. t2.start();
  18. }
  19. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/357463
推荐阅读
相关标签
  

闽ICP备14008679号