当前位置:   article > 正文

Zookeeper原理解析和使用_treecachelistener

treecachelistener

目录        

1、zookeeper基础介绍

2、zookeeper权限控制

3、zookeeper监听事件(Watcher)以curator客户端为例

        1、zookeeper基础介绍

        zookeeper是Google开源的一个分布式协调服务,数据结构类似于文件系统结构,他是由znode结构构成,每一个znode节点相当于一个文件,可以进行数据存储,最多能存1M。节点又分为两种类型,一种是持久节点,一种是临时节点,而每种节点又分为普通节点和顺序节点。相当于一共四种类型节点:持久节点、持久性顺序节点、临时节点、临时性顺序节点

        zookeeper可以实现诸如发布订阅、负载均衡、命名服务、分布式协调与通知、集群管理、Master选举、分布式同步、分布式锁、事件监听等功能。由于zookeeper是TCP长连接,所以其性能是非常高的。

        zookeeper处理基本的增删改查外,还提供了事件监听和权限控制。而且zookeeper的安装也是非常简单,只需要解压后将conf文件中的 zoo_sample.cfg改为 zoo.cfg即可使用,不需要额外有其他额外的配置。

        zookeeper保证了顺序一致性、原子性、可靠性、最终一致性。

        其内部封装好了简单易出错的关键服务,并在保证高性能、高稳定的情况下提供了简单易用的接口给用户使用。其集群模式的性能也是非常高的,目前kafka、Dubbo等都使用到了zookeeper,可见其强大的性能。目前有三种java客户端。

        zookeeper客户端:该客户端属于zookeeper原生客户端,没有做过多的封装,对于开发人员的使用挑战性比较大;

        zkClient客户端:是在zookeeper客户端的基础上进行了一些封装,对原生客户端进行了一些封装,使得开发人员可以进行比较好的使用,不用担心session会话超时等问题。zookeeper+springBoot之前版本是集成的该客户端

       curator客户端:该客户端是目前比较流行推荐使用的。该客户端在原生客户端的基础上进行了比较完整的抽象封装,还提供了分布式锁、链式调用等多种方案。对于开发人员时非常友好的。目前zookeeper+springBoot高版本集成的就是此客户端。

        2、zookeeper的权限控制

        zookeeper内置了一些权限访问设置,分为三种:

        一种是ip权限,通常是一个IP或一个IP段。也就是说只有该IP或IP段的人创建的节点,只有他们可以进行增删改查。其他IP段的是不允许进行增删改查的;

        一种是word权限,该权限是创建的节点所有人都可以进行增删改查,没有任何权限的控制。这种权限一般只是自己测试的时候使用,在生产环境上一般不会出现,容易发生误操作或暴露信息等问题;

        一种是Digest权限,该权限是使用最多也是最为常见的权限,是通过用户名和密码进行权限设置的。使用addauth进行添加用户,添加的用户可以对自己创建的节点进行增删改查,而且多个人可以同时使用一个用户进行数据操作。值得注意的是:在添加用户名和密码时,用户名和密码会经过两次加密,分别是SHA-1加密和Base64编码。但是在添加认证用户时是使用明文添加的,只有创建用户时才会使用密文进行加密

        使用命令进行添加用户和认证用户

        对于权限zookeeper分为五种:增、删、改、查、管理权限,这五种权限简写为crwda(即每个单词首字母缩写)。

  1. //test01节点进行用户添加并权限设置
  2. //setAcl /root digest:用户名:密码密文:权限
  3. setAcl /test01 digest:xiaobai:vBhnk6nHs6kcnB0jvE/Crtv0xwM=:crwda
  4. //用户认证
  5. //addauth digest 用户名:密码明文 权限
  6. addauth digest xiaobai:123456 crwda

3、zookeeper监听事件(Watcher)以curator客户端为例 

        zookeeper主要的作用之一就是当节点发生改变时可以进行集群通知,以便做出合理的处理,而感知节点变化就需要监听和通知,这是zookeeper实现负载均衡、协调管理、注册发布等功能的主要部分。

zookeeper监听的原理:

        1、当创建zookeeper客户端时,内部会默认创建两个线程;

                一个是connet线程,主要负责网络通信连接;

                一个是Listener线程,主要负责监听;

        2、客户端通过connet线程连接服务器;使用getChildren("/",true);其中"/"表示监听的根目录,true表示监听;false表示不监听。

        3、在zookeeper监听列表中,将注册的监听事件放入到监听列表中,表示这个服务器中的"/"目录被客户端监听。

        4、一旦被监听的目录下,数据或路径发生改变,zookeeper就会将这个消息发送给Listener线程。

        5、Listener线程内部调用process方法,采取相应的措施进行操作。

        zookeeper事件类型主要有以下几种:

        

zookeeper的监听主要分为两种:

        标准的观察者模式:该模式主要是使用的watcher监听器,和zookeeper原生监听器一样,该监听器只有一种,那就是一次性触发器,对节点只进行一次监听,监听完后立刻失效,并且也不会对子节点进行监听。

        缓存监听模式:该模式引入了一种本地缓存视图的Cache机制,来实现对zookeeper服务端的事件监听。Cache事件监听可以理解为本地缓存视图与远程zookeeper视图的比对过程。Cache提供了重复注册的功能。其原理就是Cache在客户端缓存了znode的各种状态,不断的和zookeeper集群进行比较,而且其比对也十分简单,如果未发生变化就会一直发送true,一旦发生改变就会发送false。如果还是不太理解可以结合上面的监听器原理解析图进行理解。

缓存监听模式分为三种:Path Catche 、Node Cache、Tree Cache

        Path Cache:用来观察znode的子节点并缓存状态,如果znode的子节点被删除、修改、创建,那么就会触发监听事件。

        Node Cache:用来观察znode本身的状态,如果znode本身被创建、更新、删除,那么就会触发监听事件。

        Tree Cache:用来观察所有节点和数据的变化,监听器的注册接口为:TreeCacheListener。

Watch特性:

        一次性触发器:客户端在节点设置了Watch监听事件后,对该节点(只对节点本身有效,其子节点是无效的)进行修改或删除都会监听到消息。但是只能监听一次;例如:客户端设置getData("/znode",true)后,第一次对该节点进行修改或删除,都会触发监听。但是再次修改或删除时,则不会进行监听。如果需要再次监听则需要重新设置监听。

  1. package com.bpc.client;
  2. import org.apache.curator.RetryPolicy;
  3. import org.apache.curator.framework.CuratorFramework;
  4. import org.apache.curator.framework.CuratorFrameworkFactory;
  5. import org.apache.curator.retry.RetryNTimes;
  6. /**
  7. * @author xiaobai
  8. */
  9. public class CuratorTest03 {
  10. /**
  11. * 连接地址
  12. */
  13. String ZK_ADDRESS = "127.0.0:2181";
  14. /**
  15. * 重试策略,RetryPolicy是个接口,里面的每一种实现都是一种策略。
  16. * 可以根据自身业务情况进行选择
  17. */
  18. RetryPolicy policy = new RetryNTimes(3, 2000);
  19. /**
  20. * curator客户端开发是一种流式编码方式,就是不断的点点点
  21. * 老版本创建连接方式
  22. */
  23. /*CuratorFramework client = CuratorFrameworkFactory.builder()
  24. //连接地址
  25. .connectString(ZK_ADDRESS)
  26. //session超时时间
  27. .sessionTimeoutMs(50000)
  28. //连接超时时间
  29. .connectionTimeoutMs(5000)
  30. //重试策略
  31. .retryPolicy(policy)
  32. .build();*/
  33. CuratorFramework client = null;
  34. public CuratorTest03() {
  35. /**
  36. * 新版本创建连接方式
  37. */
  38. client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, policy);
  39. //启动客户端
  40. client.start();
  41. }
  42. public CuratorFramework getClient() {
  43. return client;
  44. }
  45. public void setClient(CuratorFramework client) {
  46. this.client = client;
  47. }
  48. }
  1. public static void main(String[] args) throws Exception {
  2. CuratorTest03 curator = new CuratorTest03();
  3. test01(curator);
  4. //休眠时间长一点,以便于测试事件的监听
  5. Thread.sleep(1000000000);
  6. }
  7. public static void test01(CuratorTest03 curator) throws Exception {
  8. String path = "/test001";
  9. //创建节点
  10. // curator.client.create().withMode(CreateMode.PERSISTENT).forPath(path);
  11. //建立一次性节点监听
  12. curator.client.getData().usingWatcher((Watcher) (WatchedEvent event) -> {
  13. //可以对该节点数据进行操作,此处可以监听到
  14. System.out.println("对节点 [" + event.getPath() + "] 进行了操作,操作事件:" + event.getType().name());
  15. }).forPath(path);
  16. }

       

        单节点触发器:PathCache,该触发器只能对当前节点的直属子节点一直进行监听,其子节点的子节点也是监听不到的。直属子节点的初始化、创建、修改、删除等一系列操作都可以被监听到,并可以一直进行监听。

  1. public static void test04(CuratorTest03 curator) throws Exception {
  2. String path = "/test003";
  3. //创建节点
  4. curator.client.create().withMode(CreateMode.PERSISTENT).forPath(path);
  5. PathChildrenCache childrenCache = new PathChildrenCache(curator.client, path, false);
  6. //必须设置启动模式为POST_INITIALIZED_EVENT才可以进行监听
  7. childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
  8. childrenCache.getListenable().addListener(
  9. (CuratorFramework client, PathChildrenCacheEvent event) -> {
  10. System.out.println("子节点监听,进行" + event.getType() + "操作");
  11. });
  12. }

        单节点触发器:NodeCache,该触发器会一直监听当前节点的增删改操作。不会监听其子节点的变化。

  1. public static void test06(CuratorTest03 curator) throws Exception {
  2. String path = "/test006";
  3. //创建节点
  4. curator.client.create().withMode(CreateMode.PERSISTENT).forPath(path);
  5. //创建监听
  6. NodeCache nodeCache = new NodeCache(curator.client, path);
  7. nodeCache.start(true);
  8. nodeCache.getListenable().addListener(()->{
  9. System.out.println("监听到:"+nodeCache.getPath()+"节点发生变化!");
  10. });
  11. //修改数据
  12. curator.client.setData().forPath(path,"我叫小白!".getBytes());
  13. }

        树形节点触发器:Tree Node ,该触发器会一直监听本节点及下面的所有子节点 的增删改操作,相当于一个树下面的所有节点他全部会进行监听。

  1. public static void test07(CuratorTest03 curator) throws Exception {
  2. String path = "/test007";
  3. //创建监听
  4. TreeCache treeCache = new TreeCache(curator.client,path);
  5. treeCache.start();
  6. TreeCacheListener listener = new TreeCacheListener() {
  7. @Override
  8. public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
  9. ChildData data = treeCacheEvent.getData();
  10. System.out.println("监听到:"+data.getPath()+"节点发生变化!,监听状态为:"+treeCacheEvent.getType());
  11. }
  12. };
  13. treeCache.getListenable().addListener(listener);
  14. //创建节点
  15. curator.client.create().withMode(CreateMode.PERSISTENT).forPath(path);
  16. //修改数据
  17. curator.client.setData().forPath(path,"我叫小白!".getBytes());
  18. //创建子节点
  19. curator.client.create().withMode(CreateMode.PERSISTENT).forPath(path+"/tetst01");
  20. //添加子节点内容
  21. curator.client.setData().forPath(path+"/tetst01","测试创建子节点内容".getBytes());
  22. //删除子节点
  23. curator.client.delete().forPath(path+"/tetst01");
  24. //删除本节点
  25. curator.client.delete().forPath(path);
  26. }

        异步通知触发器:CuratorListener监听,该监听器主要针对background通知和错误通知。使用此监听器后,只要是调用inBackground方法都会异步接收到监听事件。不管操作的成功或失败,他都能监听到增删改查所有操作,所以该监听器不可靠。

  1. public static void test05(CuratorTest03 curator) throws Exception {
  2. String path ="/test004";
  3. //创建节点
  4. curator.client.create().withMode(CreateMode.PERSISTENT).forPath(path,"wo jiao xiao bai !".getBytes());
  5. CuratorListener listener = new CuratorListener() {
  6. @Override
  7. public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
  8. System.out.println("监听事件被触发,该操作是:"+event.getType());
  9. }
  10. };
  11. //添加监听事件
  12. curator.client.getCuratorListenable().addListener(listener);
  13. //异步获取节点数据
  14. curator.client.getData().inBackground().forPath(path);
  15. //变更节点数据
  16. curator.client.setData().inBackground().forPath(path,"我叫小白!".getBytes());
  17. //添加子节点
  18. curator.client.create().withMode(CreateMode.PERSISTENT).inBackground().forPath(path+"/test01");
  19. //创建子节点的子节点
  20. curator.client.create().withMode(CreateMode.PERSISTENT).inBackground().forPath(path+"/test01/tet02");
  21. //修改子节点内容
  22. curator.client.setData().inBackground().forPath(path+"/test01","123".getBytes());
  23. //修改子节点的子节点内容
  24. curator.client.setData().inBackground().forPath(path+"/test01/tet01","456".getBytes());
  25. //删除子节点的子节点
  26. curator.client.create().inBackground().forPath(path+"/test01/test01");
  27. //删除子节点
  28. curator.client.delete().inBackground().forPath(path+"/test01");
  29. //删除本节点
  30. curator.client.delete().inBackground().forPath(path);
  31. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/974763
推荐阅读
相关标签
  

闽ICP备14008679号