当前位置:   article > 正文

分布式框架(三):Zookeeper 整合实战

分布式框架(三):Zookeeper 整合实战

ZooKeeper应用的开发主要通过Java客户端API去连接和操作ZooKeeper集群。可供选择的Java客户端API有:

  • ZooKeeper官方的Java客户端API
  • 第三方的Java客户端API,比如Curator

ZooKeeper官方的客户端

ZooKeeper官方的客户端API提供了基本的操作。例如,创建会话、创建节点、读取节点、更新数据、删除节点和检查节点是否存在等。

不过,对于实际开发来说,ZooKeeper官方API有一些不足之处,具体如下:

  • ZooKeeper的Watcher监测是一次性的,每次触发之后都需要重新进行注册
  • 会话超时之后没有实现重连机制
  • 异常处理烦琐,ZooKeeper提供了很多异常,对于开发人员来说可能根本不知道应该如何处理这些抛出的异常
  • 仅提供了简单的byte[]数组类型的接口,没有提供Java POJO级别的序列化数据处理接口
  • 创建节点时如果抛出异常,需要自行检查节点是否存在
  • 无法实现级联删除

总之,ZooKeeper官方API功能比较简单,在实际开发过程中比较笨重,一般不推荐使用

Zookeeper 原生Java客户端使用引入zookeeper client依赖 注意:保持与服务端版本一致,不然会有很多兼容性的问题

<!-- zookeeper client -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
方法特点:
  • 所有获取 znode 数据的 API 都可以设置一个 watch 用来监控 znode 的变化。
  • 所有更新 znode 数据的 API 都有两个版本: 无条件更新版本和条件更新版本。如果 version 为 -1,更新为无条件更新。否则只有给定的 version 和 znode 当前的 version 一样,才会进行更新,这样的更新是条件更新。
  • 所有的方法都有同步和异步两个版本。同步版本的方法发送请求给 ZooKeeper 并等待服务器的响应。异步版本把请求放入客户端的请求队列,然后马上返回。异步版本通过 callback 来接受来 自服务端的响应。

demo

final CountDownLatch countDownLatch=new CountDownLatch(1);
ZooKeeper zooKeeper = new ZooKeeper(CLUSTER_CONNECT_STR,4000, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        if(Event.KeeperState.SyncConnected==event.getState()&& event.getType()== Event.EventType.None){
            //如果收到了服务端的响应事件,连接成功
            countDownLatch.countDown();
            System.out.println("连接建立");
        }
    }
});
zooKeeper.addAuthInfo("digest", "xxxx:xxxxxxx".getBytes(StandardCharsets.UTF_8));
System.out.printf("连接中");
countDownLatch.await();
//CONNECTED
System.out.println(zooKeeper.getState());

Stat tmp = zooKeeper.exists("/tmp", false);
if (tmp == null) {
    zooKeeper.create("/tmp", "mx".getBytes(StandardCharsets.UTF_8),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
zooKeeper.addWatch("/tmp", new Watcher() {
    @Override
    public void process(WatchedEvent watchedEvent) {
        System.out.println(watchedEvent);
    }
}, AddWatchMode.PERSISTENT_RECURSIVE);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

Curator开源客户端使用

Curator是Netflix公司开源的一套ZooKeeper客户端框架,和ZkClient一样它解决了非常底层的细节开发工作,包括连接、重连、反复注册Watcher的问题以及NodeExistsException异常等。

Curator是Apache基金会的顶级项目之一,Curator具有更加完善的文档,另外还提供了一套易用性和可读性更强的Fluent风格的客户端API框架。
Curator还为ZooKeeper客户端框架提供了一些比较普遍的、开箱即用的、分布式开发用的解决方案,例如Recipe、共享锁服务、Master选举机制和分布式计算器等,帮助开发者避免了“重复造轮子”的无效开发工作。

在实际的开发场景中,使用Curator客户端就足以应付日常的ZooKeeper集群操作的需求。

官网:https://curator.apache.org/

引入依赖

Curator 包含了几个包:

  • curator-framework是对ZooKeeper的底层API的一些封装。
  • curator-client提供了一些客户端的操作,例如重试策略等。
  • curator-recipes封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。
<!-- zookeeper client -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.0</version>
</dependency>

<!--curator-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.1.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

demo

创建一个客户端实例:

  • 使用工厂类CuratorFrameworkFactory的静态newClient()方法
  • 使用工厂类CuratorFrameworkFactory的静态builder构造者方法
//构建客户端实例
CuratorFramework curatorFramework= CuratorFrameworkFactory.builder()
        .connectString(CLUSTER_CONNECT_STR)
        .sessionTimeoutMs(4000)
        .retryPolicy(new ExponentialBackoffRetry(1000,3))
        .namespace("")
        .authorization("digest", "xxxx:xxxxxx".getBytes(StandardCharsets.UTF_8))
        .build();
//启动客户端
curatorFramework.start();
Stat stat=new Stat();
//查询节点数据
byte[] bytes = curatorFramework.getData().storingStatIn(stat).forPath("/user");
System.out.println(new String(bytes));

String forPath = curatorFramework
        .create()
        .creatingParentsIfNeeded()
        // protection 模式,防止由于异常原因,导致僵尸节点
        .withProtection()
        .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator-node", "some-data".getBytes());
curatorFramework.close();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

Zookeeper在分布式中命名服务的实践

ZooKeeper的命名服务主要是利用ZooKeeper节点的树形分层结构和子节点的顺序维护能力,来为分布式系统中的资源命名

分布式API目录

为分布式系统中各种API接口服务的名称、链接地址,提供类似JNDI(Java命名和目录接口)中的文件系统的功能。借助于ZooKeeper的树形分层结构就能提供分布式的API调用功能。

著名的Dubbo分布式框架就是应用了ZooKeeper的分布式的JNDI功能。在Dubbo中,使用ZooKeeper维护的全局服务接口API的地址列表。大致的思路为:

  • 服务提供者(Service Provider)在启动的时候,向ZooKeeper上的指定节点/dubbo/${serviceName}/providers写入自己的API地址,这个操作就相当于服务的公开。
  • 服务消费者(Consumer)启动的时候,订阅节点/dubbo/{serviceName}/providers下的服务提供者的URL地址,获得所有服务提供者的API。

分布式节点命名

一个分布式系统通常会由很多的节点组成,节点的数量不是固定的,而是不断动态变化的。比如说,当业务不断膨胀和流量洪峰到来时,大量的节点可能会动态加入到集群中。而一旦流量洪峰过去了,就需要下线大量的节点。再比如说,由于机器或者网络的原因,一些节点会主动离开集群。

如何为大量的动态节点命名呢?一种简单的办法是可以通过配置文件,手动为每一个节点命名。但是,如果节点数据量太大,或者说变动频繁,手动命名则是不现实的,这就需要用到分布式节点的命名服务。

可用于生成集群节点的编号的方案:

  1. 使用数据库的自增ID特性,用数据表存储机器的MAC地址或者IP来维护
  2. 使用ZooKeeper持久顺序节点的顺序特性来维护节点的NodeId编号

在第2种方案中,集群节点命名服务的基本流程是:

  • 启动节点服务,连接ZooKeeper,检查命名服务根节点是否存在,如果不存在,就创建系统的根节点
  • 在根节点下创建一个临时顺序ZNode节点,取回ZNode的编号把它作为分布式系统中节点的NODEID
  • 如果临时节点太多,可以根据需要删除临时顺序ZNode节点

分布式的ID生成器

在分布式系统中,分布式ID生成器的使用场景非常之多:

  • 大量的数据记录,需要分布式ID
  • 大量的系统消息,需要分布式ID
  • 大量的请求日志,如restful的操作记录,需要唯一标识,以便进行后续的用户行为分析和调用链路分析
  • 分布式节点的命名服务,往往也需要分布式ID

传统的数据库自增主键已经不能满足需求。在分布式系统环境中,迫切需要一种全新的唯一ID系统,这种系统需要满足以下需求:

  1. 全局唯一:不能出现重复ID
  2. 高可用:ID生成系统是基础系统,被许多关键系统调用,一旦宕机,就会造成严重影响

有哪些分布式的ID生成器方案呢?大致如下:

  • Java的UUID (无序、无意义、db B++树效率)
  • 分布式缓存Redis生成ID:利用Redis的原子操作INCR和INCRBY,生成全局唯一的ID (Redis可靠性)
  • Twitter的SnowFlake算法
  • ZooKeeper生成ID:利用ZooKeeper的顺序节点,生成全局唯一的ID
  • MongoDb的ObjectId:MongoDB是一个分布式的非结构化NoSQL数据库,每插入一条记录会自动生成全局唯一的一个_id字段值,它是一个12字节的字符串,可以作为分布式系统中全局唯一的ID

基于Zookeeper实现分布式ID生成器

在ZooKeeper节点的四种类型中,其中有以下两种类型具备自动编号的能力

  • PERSISTENT_SEQUENTIAL持久化顺序节点
  • EPHEMERAL_SEQUENTIAL临时顺序节点

ZooKeeper的每一个节点都会为它的第一级子节点维护一份顺序编号,会记录每个子节点创建的先后顺序,这个顺序编号是分布式同步的,也是全局唯一的。
可以通过创建ZooKeeper的临时顺序节点的方法,生成全局唯一的ID

基于Zookeeper实现SnowFlakeID算法

在这里插入图片描述

Twitter(推特)的SnowFlake算法是一种著名的分布式服务器用户ID生成算法。SnowFlake算法所生成的ID是一个64bit的长整型数字,如图10-2所示。这个64bit被划分成四个部分,其中后面三个部分分别表示时间戳、工作机器ID、序列号。

SnowFlakeID的四个部分,具体介绍如下:

  1. 第一位 占用1 bit,其值始终是0,没有实际作用。
  2. 时间戳 占用41 bit,精确到毫秒,总共可以容纳约69年的时间。
  3. 工作机器id占用10 bit,最多可以容纳1024个节点。
  4. 序列号 占用12 bit。这个值在同一毫秒同一节点上从0开始不断累加,最多可以累加到4095。

在工作节点达到1024顶配的场景下,SnowFlake算法在同一毫秒最多可以生成的ID数量为: 1024 * 4096 =4194304,在绝大多数并发场景下都是够用的。

SnowFlake算法的优点:

  • 生成ID时不依赖于数据库,完全在内存生成,高性能和高可用性
  • 容量大,每秒可生成几百万个ID
  • ID呈趋势递增,后续插入数据库的索引树时,性能较高

SnowFlake算法的缺点:

  • 依赖于系统时钟的一致性,如果某台机器的系统时钟回拨了,有可能造成ID冲突,或者ID乱序
  • 在启动之前,如果这台机器的系统时间回拨过,那么有可能出现ID重复的危险

Zookeeper 分布式锁

  • 什么是分布式锁
    在单体的应用开发场景中涉及并发同步的时候,大家往往采用Synchronized(同步)或者其他同一个JVM内Lock机制来解决多线程间的同步问题。在分布式集群工作的开发场景中,就需要一种更加高级的锁机制来处理跨机器的进程之间的数据同步问题,这种跨机器的锁就是分布式锁。

  • 目前分布式锁,比较成熟、主流的方案:

    1. 基于数据库的分布式锁。db操作性能较差,并且有锁表的风险,一般不考虑。
    2. 基于Redis的分布式锁。适用于并发量很大、性能要求很高而可靠性问题可以通过其他方案去弥补的场景。
    3. 基于ZooKeeper的分布式锁。适用于高可靠(高可用),而并发量不是太高的场景。

基于Redis实现分布式锁 (高可用问题

基于数据库设计 – 利用数据库的唯一索引实现 (数据库性能问题)

在这里插入图片描述

基于Zookeeper – 临时 znode (羊群效应问题

在这里插入图片描述

基于Zookeeper – 临时顺序节点 公平锁 (频繁的创建和删除节点性能问题

在这里插入图片描述

Curator 可重入分布式锁工作流程

在这里插入图片描述

总结

  • 优点:ZooKeeper分布式锁(如InterProcessMutex),具备高可用、可重入、阻塞锁特性,可解决失效死锁问题,使用起来也较为简单。
  • 缺点:因为需要频繁的创建和删除节点,性能上不如Redis。

在高性能、高并发的应用场景下,不建议使用ZooKeeper的分布式锁。而由于ZooKeeper的高可用性,因此在并发量不是太高的应用场景中,还是推荐使用ZooKeeper的分布式锁。

Zookeeper注册中心(服务注册和服务发现 CP)

基于 ZooKeeper 本身的特性可以实现注册中心

https://spring.io/projects/spring-cloud-zookeeper#learn

pom 依赖

父工程 注意: springboot和springcloud的版本兼容问题
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.2.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
    <java.version>1.8</java.version>
    <spring-cloud.version>Hoxton.SR8</spring-cloud.version>
</properties>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
子工程 注意: zookeeper客户端依赖和zookeeper sever的版本兼容问题
<!-- zookeeper服务注册与发现 -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<!-- zookeeper client -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

Spring Cloud整合Zookeeper注册中心核心源码入口: ZookeeperDiscoveryClientConfiguration

配置zookeeper注册中心地址 注意:如果address有问题,会出现找不到服务的情况,可以通过instance-host配置指定

spring:
  cloud:
    zookeeper:    
      connect-string: localhost:2181
      discovery:
        instance-host: 127.0.0.1
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

整合rabbin/feign进行服务调用

rabbin
@Configuration
public class RestConfig {
    @Bean
    @LoadBalanced  //客户端负载均衡
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
@Autowired
private RestTemplate restTemplate;
String url = "http://mall-order/order/findOrderByUserId/"+id;
R result = restTemplate.getForObject(url,R.class);
  • 1
  • 2
  • 3
  • 4
feign
@FeignClient(value = "open-api-service-rabbitmq", fallbackFactory = RabbitMQSenderFeignFallback.class)
public interface RabbitMQSenderFeign {
    @PostMapping("/sender/syncOrder")
    boolean syncOrder(@RequestBody SyncOrder syncOrder);
    ....
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
@Autowired
OrderFeignService orderFeignService;
//feign调用封装 ribbon调用 rpc调用 http
R result = orderFeignService.findOrderByUserId(id);
  • 1
  • 2
  • 3
  • 4

源码分析

org.springframework.cloud.client.discovery.DiscoveryClient

org.springframework.cloud.client.serviceregistry.ServiceRegistry

org.springframework.boot.autoconfigure.AutoConfigureBefore

spring-cloud 服务注册接口

package org.springframework.cloud.client.serviceregistry;

public interface ServiceRegistry<R extends Registration> {
    void register(R registration);

    void deregister(R registration);

    void close();

    void setStatus(R registration, String status);

    <T> T getStatus(R registration);
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

版本依赖

spring-cloud 和 spring-boot (https://spring.io/projects/spring-cloud)
在这里插入图片描述

spring-cloud-alibaba (https://github.com/alibaba/spring-cloud-alibaba/wiki)

在这里插入图片描述

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/911362
推荐阅读
相关标签
  

闽ICP备14008679号