当前位置:   article > 正文

ZooKeeper快速入门学习+在springboot中的应用+监听机制的业务使用_zookeeper在spring项目中的应用

zookeeper在spring项目中的应用

目录

前言

基础知识

一、什么是ZooKeeper

二、为什么使用ZooKeeper

三、数据结构

四、监听通知机制

五、选举机制

使用

1 下载zookeeper

2 修改

3 排错

在SpringBoot中的使用

安装可视化插件

依赖 配置

安装httpclient方便测试

增删查改

新建控制器

创建节点

查询节点

更新节点

删除节点

使用监听

新建监听器

更改控制器中的方法。

使用httpclient请求,结果如下 

注意事项

业务使用


前言

在很多时候,我们都可以在各种框架应用中看到ZooKeeper的身影,比如Kafka中间件,Dubbo框架,Hadoop等等。为什么到处都看到ZooKeeper?

基础知识

一、什么是ZooKeeper

        ZooKeeper是一个分布式服务协调框架,提供了分布式数据一致性的解决方案,基于ZooKeeper的数据结构,Watcher,选举机制等特点,可以实现数据的发布/订阅,软负载均衡,命名服务,统一配置管理,分布式锁,集群管理等等。

        Zookeeper 的核心实现是一个分布式的数据存储系统,其内部采用 ZAB 协议(Zookeeper Atomic Broadcast)进行主从复制,确保了数据的一致性和可靠性。在 Zookeeper 中,数据存储采用了一种称为“Znode”的数据模型,类似于 Unix 文件系统。

Znode 是 Zookeeper 中最基本的数据单元,是一个有层级的树形结构,每个节点都有一个路径,其中根节点为“/”,子节点路径会在父节点路径的基础上加上相对路径,最终形成一棵完整的树形结构。

除了 Znode 外,Zookeeper 还支持节点的监听和事件机制,监听可以让客户端对 Znode 的变化做出及时响应,增强了应用的实时性。

二、为什么使用ZooKeeper

ZooKeeper能保证:

  • 更新请求顺序进行。来自同一个client的更新请求按其发送顺序依次执行
  • 数据更新原子性。一次数据更新要么成功,要么失败
  • 全局唯一数据视图。client无论连接到哪个server,数据视图都是一致的
  • 实时性。在一定时间范围内,client读到的数据是最新的

三、数据结构

ZooKeeper的数据结构和Unix文件系统很类似,总体上可以看做是一棵树,每一个节点称之为一个ZNode,每一个ZNode默认能存储1M的数据。每一个ZNode可通过唯一的路径标识。如下图所示:

创建ZNode时,可以指定以下四种类型,包括:

  • PERSISTENT,持久性ZNode。创建后,即使客户端与服务端断开连接也不会删除,只有客户端主动删除才会消失。
  • PERSISTENT_SEQUENTIAL,持久性顺序编号ZNode。和持久性节点一样不会因为断开连接后而删除,并且ZNode的编号会自动增加。
  • EPHEMERAL,临时性ZNode。客户端与服务端断开连接,该ZNode会被删除。
  • EPEMERAL_SEQUENTIAL,临时性顺序编号ZNode。和临时性节点一样,断开连接会被删除,并且ZNode的编号会自动增加。

四、监听通知机制

Watcher是基于观察者模式实现的一种机制。如果我们需要实现当某个ZNode节点发生变化时收到通知,就可以使用Watcher监听器

客户端通过设置监视点(watcher)向 ZooKeeper 注册需要接收通知的 znode,在 znode 发生变化时 ZooKeeper 就会向客户端发送消息

这种通知机制是一次性的。一旦watcher被触发,ZooKeeper就会从相应的存储中删除。如果需要不断监听ZNode的变化,可以在收到通知后再设置新的watcher注册到ZooKeeper。

监视点的类型有很多,如监控ZNode数据变化、监控ZNode子节点变化、监控ZNode 创建或删除

五、选举机制

ZooKeeper是一个高可用的应用框架,因为ZooKeeper是支持集群的。ZooKeeper在集群状态下,配置文件是不会指定Master和Slave,而是在ZooKeeper服务器初始化时就在内部进行选举,产生一台做为Leader,多台做为Follower,并且遵守半数可用原则。

由于遵守半数可用原则,所以5台服务器和6台服务器,实际上最大允许宕机数量都是3台,所以为了节约成本,集群的服务器数量一般设置为奇数

如果在运行时,如果长时间无法和Leader保持连接的话,则会再次进行选举,产生新的Leader,以保证服务的可用

使用

1 下载zookeeper

https://dlcdn.apache.org/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1-bin.tar.gz

2 修改

将下面的这个配置文件复制一份然后将新复制的改名为zoo.cfg

然后更改文件内容为

  1. tickTime=2000
  2. dataDir=X:\\mygreensoftware\\apache-zookeeper-3.8.1-bin\\tmp\\data
  3. dataLogDir=X:\\mygreensoftware\\apache-zookeeper-3.8.1-bin\\tmp\\logs
  4. clientPort=2181

创建设置的目录

双击启动。。

3 排错

wc,闪卡了!!

别急我们可能是JAVA_HOME没有配置好,我们进入环境变量配置JAVA_HOME

注意这里是jdk的主路径,不带bin奥!! 

ok,再次启动试试,好了,这次没有问题了

在SpringBoot中的使用

安装可视化插件

 侧边

新建连接

127.0.0.1:2181

 然后点击connect

 

依赖 配置

  1. <dependency>
  2. <groupId>org.apache.zookeeper</groupId>
  3. <artifactId>zookeeper</artifactId>
  4. <version>3.4.6</version>
  5. </dependency>
  1. zookeeper:
  2. server: 127.0.0.1:2181
  3. timeout: 3000

配置类

 

  1. package com.scm.springbootzookper.config;
  2. import org.apache.zookeeper.ZooKeeper;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import java.io.IOException;
  7. @Configuration
  8. public class ZookeeperConfig {
  9. @Value("${zookeeper.server}")
  10. private String server;
  11. @Value("${zookeeper.timeout}")
  12. private Integer timeout;
  13. @Bean
  14. public ZooKeeper zkClient() throws IOException {
  15. return new ZooKeeper(server, timeout, watchedEvent -> {});
  16. }
  17. }

安装httpclient方便测试

可参照以下

https://blog.csdn.net/qq_53679247/article/details/130841001

增删查改

新建控制器

  1. package com.scm.springbootzookper.controller;
  2. import org.apache.zookeeper.KeeperException;
  3. import org.apache.zookeeper.ZooKeeper;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.web.bind.annotation.*;
  6. @RestController
  7. @RequestMapping("/api")
  8. public class ZookController {
  9. @Autowired
  10. ZooKeeper zkClient;
  11. @GetMapping("/zookeeper")
  12. public String getData() throws KeeperException, InterruptedException {
  13. String path = "/zookeeper";
  14. boolean watch = true;
  15. byte[] data = zkClient.getData(path, watch, null);
  16. return new String(data);
  17. }
  18. }

使用http client测试

 

创建节点

API

public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
  • path ZNode路径
  • data ZNode存储的数据
  • acl ACL权限控制
  • createMode ZNode类型

 

  1. @GetMapping("/addNode/{nodename}/{data}")
  2. public String addNode(@PathVariable("nodename")String nodename, @PathVariable("data") String data1){
  3. // 创建节点的路径
  4. String path = "/"+nodename;
  5. // 节点数据
  6. String data =data1;
  7. // 权限控制
  8. List<ACL> aclList = ZooDefs.Ids.OPEN_ACL_UNSAFE;
  9. // 创建节点的类型
  10. CreateMode createMode = CreateMode.PERSISTENT;
  11. String result = null;
  12. try {
  13. result = zkClient.create(path, data.getBytes(), aclList, createMode);
  14. } catch (Exception e) {
  15. throw new RuntimeException(e);
  16. }
  17. return result;
  18. }

httpclent 环境数据

  1. {
  2. "dev": {
  3. "name": "value",
  4. "test": "test",
  5. "nodename": "node1",
  6. "data": "我是测试数据1"
  7. }
  8. }

 

查询节点

  1. @GetMapping("/getData/{nodename}")
  2. public String getData(@PathVariable("nodename") String nodename){
  3. //数据的描述信息,包括版本号,ACL权限,子节点信息等等
  4. Stat stat = new Stat();
  5. //返回结果是byte[]数据,getData()方法底层会把描述信息复制到stat对象中
  6. byte[] bytes;
  7. String path="/"+nodename;
  8. try {
  9. bytes = zkClient.getData(path, false, stat);
  10. } catch (Exception e) {
  11. throw new RuntimeException(e);
  12. }
  13. //打印结果
  14. System.out.println("ZNode的数据data:" + new String(bytes));//Hello World
  15. System.out.println("获取到dataVersion版本号:" + stat.getVersion());//默认数据版本号是0
  16. return new String(bytes);
  17. }

更新节点

删除和更新操作,必须获取到版本号才能进行修改

  1. @GetMapping("/setData/{nodename}/{data}")
  2. public String setData(@PathVariable("nodename")String nodename, @PathVariable("data") String data1){
  3. String path = "/"+nodename;
  4. String data = data1;
  5. int version = 0;
  6. Stat stat = null;
  7. try {
  8. stat = zkClient.setData(path, data.getBytes(), version);
  9. } catch (Exception e) {
  10. throw new RuntimeException(e);
  11. }
  12. return stat.toString();
  13. }

删除节点

  1. @GetMapping("/deleteNode/{nodename}")
  2. public String deleteNode(@PathVariable("nodename")String nodename){
  3. String path = "/"+nodename;
  4. int version = 0;
  5. try {
  6. zkClient.delete(path, version);
  7. } catch (Exception e) {
  8. throw new RuntimeException(e);
  9. }
  10. return "OK!";
  11. }

使用监听

新建监听器

  1. package com.scm.springbootzookper.watch;
  2. import org.apache.zookeeper.WatchedEvent;
  3. import org.apache.zookeeper.Watcher;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class MyWatcher implements Watcher {
  7. @Override
  8. public void process(WatchedEvent watchedEvent) {
  9. Event.KeeperState state = watchedEvent.getState();
  10. Event.EventType type = watchedEvent.getType();
  11. System.out.println("检测到节点发生变化.....");
  12. System.out.println("节点名称:"+state.name());
  13. System.out.println("事件类型:"+type.name());
  14. System.out.println("节点路径"+watchedEvent.getPath());
  15. }
  16. }

更改控制器中的方法。

  1. @GetMapping("/setData/{nodename}/{data}")
  2. public String setData(@PathVariable("nodename")String nodename, @PathVariable("data") String data1) throws InterruptedException, KeeperException {
  3. String path = "/"+nodename;
  4. zkClient.exists(path, new MyWatcher());
  5. String data = data1;
  6. // 这里必须先拿到版本号才能更新
  7. int version =5;
  8. Stat stat = null;
  9. try {
  10. stat = zkClient.setData(path, data.getBytes(), version);
  11. } catch (Exception e) {
  12. throw new RuntimeException(e);
  13. }
  14. return stat.toString();
  15. }

使用httpclient请求,结果如下 

注意事项

需要注意的是,注册一次监听器只能使用一次,使用完就失效了。 

串行执行。客户端Watcher回调的过程是一个串行同步的过程,这是为了保证顺序。

业务使用

判断通知时节点的更改类型,进行其他操作。

  1. package com.scm.springbootzookper.watch;
  2. import org.apache.zookeeper.WatchedEvent;
  3. import org.apache.zookeeper.Watcher;
  4. import org.springframework.stereotype.Component;
  5. @Component
  6. public class MyWatcher implements Watcher {
  7. @Override
  8. public void process(WatchedEvent watchedEvent) {
  9. Event.KeeperState state = watchedEvent.getState();
  10. Event.EventType type = watchedEvent.getType();
  11. if (Event.EventType.NodeDataChanged.getIntValue()==type.getIntValue()) {
  12. System.out.println("节点被修改了!");
  13. }
  14. if (Event.EventType.NodeDeleted.getIntValue()==type.getIntValue()) {
  15. System.out.println("节点被删除了!");
  16. }
  17. System.out.println("检测到节点发生变化.....");
  18. System.out.println("节点名称:"+state.name());
  19. System.out.println("事件类型:"+type.name());
  20. System.out.println("节点路径"+watchedEvent.getPath());
  21. }
  22. }

可以进行一些业务操作。

以下是Watch接口的源码,我们可以注意到其中的枚举类型,

  1. //
  2. // Source code recreated from a .class file by IntelliJ IDEA
  3. // (powered by FernFlower decompiler)
  4. //
  5. package org.apache.zookeeper;
  6. public interface Watcher {
  7. void process(WatchedEvent var1);
  8. public interface Event {
  9. public static enum EventType {
  10. None(-1),
  11. NodeCreated(1),
  12. NodeDeleted(2),
  13. NodeDataChanged(3),
  14. NodeChildrenChanged(4);
  15. private final int intValue;
  16. private EventType(int intValue) {
  17. this.intValue = intValue;
  18. }
  19. public int getIntValue() {
  20. return this.intValue;
  21. }
  22. public static EventType fromInt(int intValue) {
  23. switch (intValue) {
  24. case -1:
  25. return None;
  26. case 0:
  27. default:
  28. throw new RuntimeException("Invalid integer value for conversion to EventType");
  29. case 1:
  30. return NodeCreated;
  31. case 2:
  32. return NodeDeleted;
  33. case 3:
  34. return NodeDataChanged;
  35. case 4:
  36. return NodeChildrenChanged;
  37. }
  38. }
  39. }
  40. public static enum KeeperState {
  41. /** @deprecated */
  42. @Deprecated
  43. Unknown(-1),
  44. Disconnected(0),
  45. /** @deprecated */
  46. @Deprecated
  47. NoSyncConnected(1),
  48. SyncConnected(3),
  49. AuthFailed(4),
  50. ConnectedReadOnly(5),
  51. SaslAuthenticated(6),
  52. Expired(-112);
  53. private final int intValue;
  54. private KeeperState(int intValue) {
  55. this.intValue = intValue;
  56. }
  57. public int getIntValue() {
  58. return this.intValue;
  59. }
  60. public static KeeperState fromInt(int intValue) {
  61. switch (intValue) {
  62. case -112:
  63. return Expired;
  64. case -1:
  65. return Unknown;
  66. case 0:
  67. return Disconnected;
  68. case 1:
  69. return NoSyncConnected;
  70. case 3:
  71. return SyncConnected;
  72. case 4:
  73. return AuthFailed;
  74. case 5:
  75. return ConnectedReadOnly;
  76. case 6:
  77. return SaslAuthenticated;
  78. default:
  79. throw new RuntimeException("Invalid integer value for conversion to KeeperState");
  80. }
  81. }
  82. }
  83. }
  84. }

END.........

本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/blog/article/detail/53994
推荐阅读
相关标签
  

闽ICP备14008679号