赞
踩
目录
在很多时候,我们都可以在各种框架应用中看到ZooKeeper的身影,比如Kafka中间件,Dubbo框架,Hadoop等等。为什么到处都看到ZooKeeper?
ZooKeeper是一个分布式服务协调框架,提供了分布式数据一致性的解决方案,基于ZooKeeper的数据结构,Watcher,选举机制等特点,可以实现数据的发布/订阅,软负载均衡,命名服务,统一配置管理,分布式锁,集群管理等等。
Zookeeper 的核心实现是一个分布式的数据存储系统,其内部采用 ZAB 协议(Zookeeper Atomic Broadcast)进行主从复制,确保了数据的一致性和可靠性。在 Zookeeper 中,数据存储采用了一种称为“Znode”的数据模型,类似于 Unix 文件系统。
Znode 是 Zookeeper 中最基本的数据单元,是一个有层级的树形结构,每个节点都有一个路径,其中根节点为“/”,子节点路径会在父节点路径的基础上加上相对路径,最终形成一棵完整的树形结构。
除了 Znode 外,Zookeeper 还支持节点的监听和事件机制,监听可以让客户端对 Znode 的变化做出及时响应,增强了应用的实时性。
ZooKeeper能保证:
ZooKeeper的数据结构和Unix文件系统很类似,总体上可以看做是一棵树,每一个节点称之为一个ZNode,每一个ZNode默认能存储1M的数据。每一个ZNode可通过唯一的路径标识。如下图所示:
创建ZNode时,可以指定以下四种类型,包括:
Watcher是基于观察者模式实现的一种机制。如果我们需要实现当某个ZNode节点发生变化时收到通知,就可以使用Watcher监听器。
客户端通过设置监视点(watcher)向 ZooKeeper 注册需要接收通知的 znode,在 znode 发生变化时 ZooKeeper 就会向客户端发送消息。
这种通知机制是一次性的。一旦watcher被触发,ZooKeeper就会从相应的存储中删除。如果需要不断监听ZNode的变化,可以在收到通知后再设置新的watcher注册到ZooKeeper。
监视点的类型有很多,如监控ZNode数据变化、监控ZNode子节点变化、监控ZNode 创建或删除。
ZooKeeper是一个高可用的应用框架,因为ZooKeeper是支持集群的。ZooKeeper在集群状态下,配置文件是不会指定Master和Slave,而是在ZooKeeper服务器初始化时就在内部进行选举,产生一台做为Leader,多台做为Follower,并且遵守半数可用原则。
由于遵守半数可用原则,所以5台服务器和6台服务器,实际上最大允许宕机数量都是3台,所以为了节约成本,集群的服务器数量一般设置为奇数。
如果在运行时,如果长时间无法和Leader保持连接的话,则会再次进行选举,产生新的Leader,以保证服务的可用。
https://dlcdn.apache.org/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1-bin.tar.gz
将下面的这个配置文件复制一份然后将新复制的改名为zoo.cfg
然后更改文件内容为
- tickTime=2000
- dataDir=X:\\mygreensoftware\\apache-zookeeper-3.8.1-bin\\tmp\\data
- dataLogDir=X:\\mygreensoftware\\apache-zookeeper-3.8.1-bin\\tmp\\logs
- clientPort=2181
创建设置的目录
双击启动。。
wc,闪卡了!!
别急我们可能是JAVA_HOME没有配置好,我们进入环境变量配置JAVA_HOME
注意这里是jdk的主路径,不带bin奥!!
ok,再次启动试试,好了,这次没有问题了
侧边
新建连接
127.0.0.1:2181
然后点击connect
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.4.6</version>
- </dependency>
- zookeeper:
- server: 127.0.0.1:2181
- timeout: 3000
配置类
- package com.scm.springbootzookper.config;
-
- import org.apache.zookeeper.ZooKeeper;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.io.IOException;
-
- @Configuration
- public class ZookeeperConfig {
- @Value("${zookeeper.server}")
- private String server;
-
- @Value("${zookeeper.timeout}")
- private Integer timeout;
-
- @Bean
- public ZooKeeper zkClient() throws IOException {
- return new ZooKeeper(server, timeout, watchedEvent -> {});
- }
- }
可参照以下
https://blog.csdn.net/qq_53679247/article/details/130841001
- package com.scm.springbootzookper.controller;
-
- import org.apache.zookeeper.KeeperException;
- import org.apache.zookeeper.ZooKeeper;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.*;
-
- @RestController
- @RequestMapping("/api")
- public class ZookController {
-
- @Autowired
- ZooKeeper zkClient;
-
- @GetMapping("/zookeeper")
- public String getData() throws KeeperException, InterruptedException {
- String path = "/zookeeper";
- boolean watch = true;
- byte[] data = zkClient.getData(path, watch, null);
- return new String(data);
- }
-
- }
使用http client测试
API
public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
- @GetMapping("/addNode/{nodename}/{data}")
- public String addNode(@PathVariable("nodename")String nodename, @PathVariable("data") String data1){
- // 创建节点的路径
- String path = "/"+nodename;
- // 节点数据
- String data =data1;
- // 权限控制
- List<ACL> aclList = ZooDefs.Ids.OPEN_ACL_UNSAFE;
- // 创建节点的类型
- CreateMode createMode = CreateMode.PERSISTENT;
-
- String result = null;
- try {
- result = zkClient.create(path, data.getBytes(), aclList, createMode);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return result;
- }
httpclent 环境数据
- {
- "dev": {
- "name": "value",
- "test": "test",
- "nodename": "node1",
- "data": "我是测试数据1"
- }
- }
- @GetMapping("/getData/{nodename}")
- public String getData(@PathVariable("nodename") String nodename){
- //数据的描述信息,包括版本号,ACL权限,子节点信息等等
- Stat stat = new Stat();
- //返回结果是byte[]数据,getData()方法底层会把描述信息复制到stat对象中
- byte[] bytes;
- String path="/"+nodename;
- try {
- bytes = zkClient.getData(path, false, stat);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- //打印结果
- System.out.println("ZNode的数据data:" + new String(bytes));//Hello World
- System.out.println("获取到dataVersion版本号:" + stat.getVersion());//默认数据版本号是0
- return new String(bytes);
- }
删除和更新操作,必须获取到版本号才能进行修改
- @GetMapping("/setData/{nodename}/{data}")
- public String setData(@PathVariable("nodename")String nodename, @PathVariable("data") String data1){
- String path = "/"+nodename;
- String data = data1;
- int version = 0;
-
- Stat stat = null;
- try {
- stat = zkClient.setData(path, data.getBytes(), version);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return stat.toString();
- }
-
- @GetMapping("/deleteNode/{nodename}")
- public String deleteNode(@PathVariable("nodename")String nodename){
- String path = "/"+nodename;
- int version = 0;
- try {
- zkClient.delete(path, version);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return "OK!";
- }
- package com.scm.springbootzookper.watch;
-
- import org.apache.zookeeper.WatchedEvent;
- import org.apache.zookeeper.Watcher;
- import org.springframework.stereotype.Component;
-
- @Component
- public class MyWatcher implements Watcher {
- @Override
- public void process(WatchedEvent watchedEvent) {
- Event.KeeperState state = watchedEvent.getState();
- Event.EventType type = watchedEvent.getType();
- System.out.println("检测到节点发生变化.....");
- System.out.println("节点名称:"+state.name());
- System.out.println("事件类型:"+type.name());
- System.out.println("节点路径"+watchedEvent.getPath());
-
- }
- }
- @GetMapping("/setData/{nodename}/{data}")
- public String setData(@PathVariable("nodename")String nodename, @PathVariable("data") String data1) throws InterruptedException, KeeperException {
- String path = "/"+nodename;
- zkClient.exists(path, new MyWatcher());
- String data = data1;
- // 这里必须先拿到版本号才能更新
- int version =5;
-
- Stat stat = null;
- try {
- stat = zkClient.setData(path, data.getBytes(), version);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- return stat.toString();
- }
需要注意的是,注册一次监听器只能使用一次,使用完就失效了。
串行执行。客户端Watcher回调的过程是一个串行同步的过程,这是为了保证顺序。
判断通知时节点的更改类型,进行其他操作。
- package com.scm.springbootzookper.watch;
-
- import org.apache.zookeeper.WatchedEvent;
- import org.apache.zookeeper.Watcher;
- import org.springframework.stereotype.Component;
-
- @Component
- public class MyWatcher implements Watcher {
- @Override
- public void process(WatchedEvent watchedEvent) {
- Event.KeeperState state = watchedEvent.getState();
- Event.EventType type = watchedEvent.getType();
- if (Event.EventType.NodeDataChanged.getIntValue()==type.getIntValue()) {
- System.out.println("节点被修改了!");
- }
- if (Event.EventType.NodeDeleted.getIntValue()==type.getIntValue()) {
- System.out.println("节点被删除了!");
- }
- System.out.println("检测到节点发生变化.....");
- System.out.println("节点名称:"+state.name());
- System.out.println("事件类型:"+type.name());
- System.out.println("节点路径"+watchedEvent.getPath());
-
- }
- }
可以进行一些业务操作。
以下是Watch接口的源码,我们可以注意到其中的枚举类型,
- //
- // Source code recreated from a .class file by IntelliJ IDEA
- // (powered by FernFlower decompiler)
- //
-
- package org.apache.zookeeper;
-
- public interface Watcher {
- void process(WatchedEvent var1);
-
- public interface Event {
- public static enum EventType {
- None(-1),
- NodeCreated(1),
- NodeDeleted(2),
- NodeDataChanged(3),
- NodeChildrenChanged(4);
-
- private final int intValue;
-
- private EventType(int intValue) {
- this.intValue = intValue;
- }
-
- public int getIntValue() {
- return this.intValue;
- }
-
- public static EventType fromInt(int intValue) {
- switch (intValue) {
- case -1:
- return None;
- case 0:
- default:
- throw new RuntimeException("Invalid integer value for conversion to EventType");
- case 1:
- return NodeCreated;
- case 2:
- return NodeDeleted;
- case 3:
- return NodeDataChanged;
- case 4:
- return NodeChildrenChanged;
- }
- }
- }
-
- public static enum KeeperState {
- /** @deprecated */
- @Deprecated
- Unknown(-1),
- Disconnected(0),
- /** @deprecated */
- @Deprecated
- NoSyncConnected(1),
- SyncConnected(3),
- AuthFailed(4),
- ConnectedReadOnly(5),
- SaslAuthenticated(6),
- Expired(-112);
-
- private final int intValue;
-
- private KeeperState(int intValue) {
- this.intValue = intValue;
- }
-
- public int getIntValue() {
- return this.intValue;
- }
-
- public static KeeperState fromInt(int intValue) {
- switch (intValue) {
- case -112:
- return Expired;
- case -1:
- return Unknown;
- case 0:
- return Disconnected;
- case 1:
- return NoSyncConnected;
- case 3:
- return SyncConnected;
- case 4:
- return AuthFailed;
- case 5:
- return ConnectedReadOnly;
- case 6:
- return SaslAuthenticated;
- default:
- throw new RuntimeException("Invalid integer value for conversion to KeeperState");
- }
- }
- }
- }
- }
END.........
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。