赞
踩
ZooKeeper应用的开发主要通过Java客户端API去连接和操作ZooKeeper集群。可供选择的Java客户端API有:
ZooKeeper官方的客户端API提供了基本的操作。例如,创建会话、创建节点、读取节点、更新数据、删除节点和检查节点是否存在等。
不过,对于实际开发来说,ZooKeeper官方API有一些不足之处,具体如下:
总之,ZooKeeper官方API功能比较简单,在实际开发过程中比较笨重,一般不推荐使用
注意:保持与服务端版本一致,不然会有很多兼容性的问题
<!-- zookeeper client -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.0</version>
</dependency>
final CountDownLatch countDownLatch=new CountDownLatch(1);
ZooKeeper zooKeeper = new ZooKeeper(CLUSTER_CONNECT_STR,4000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(Event.KeeperState.SyncConnected==event.getState()&& event.getType()== Event.EventType.None){
//如果收到了服务端的响应事件,连接成功
countDownLatch.countDown();
System.out.println("连接建立");
}
}
});
zooKeeper.addAuthInfo("digest", "xxxx:xxxxxxx".getBytes(StandardCharsets.UTF_8));
System.out.printf("连接中");
countDownLatch.await();
//CONNECTED
System.out.println(zooKeeper.getState());
Stat tmp = zooKeeper.exists("/tmp", false);
if (tmp == null) {
zooKeeper.create("/tmp", "mx".getBytes(StandardCharsets.UTF_8),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
zooKeeper.addWatch("/tmp", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println(watchedEvent);
}
}, AddWatchMode.PERSISTENT_RECURSIVE);
Curator是Netflix公司开源的一套ZooKeeper客户端框架,和ZkClient一样它解决了非常底层的细节开发工作,包括连接、重连、反复注册Watcher的问题以及NodeExistsException异常等。
Curator是Apache基金会的顶级项目之一,Curator具有更加完善的文档,另外还提供了一套易用性和可读性更强的Fluent风格的客户端API框架。
Curator还为ZooKeeper客户端框架提供了一些比较普遍的、开箱即用的、分布式开发用的解决方案,例如Recipe、共享锁服务、Master选举机制和分布式计算器等,帮助开发者避免了“重复造轮子”的无效开发工作。
在实际的开发场景中,使用Curator客户端就足以应付日常的ZooKeeper集群操作的需求。
官网:https://curator.apache.org/
Curator 包含了几个包:
<!-- zookeeper client -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.0</version>
</dependency>
<!--curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
创建一个客户端实例:
//构建客户端实例
CuratorFramework curatorFramework= CuratorFrameworkFactory.builder()
.connectString(CLUSTER_CONNECT_STR)
.sessionTimeoutMs(4000)
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.namespace("")
.authorization("digest", "xxxx:xxxxxx".getBytes(StandardCharsets.UTF_8))
.build();
//启动客户端
curatorFramework.start();
Stat stat=new Stat();
//查询节点数据
byte[] bytes = curatorFramework.getData().storingStatIn(stat).forPath("/user");
System.out.println(new String(bytes));
String forPath = curatorFramework
.create()
.creatingParentsIfNeeded()
// protection 模式,防止由于异常原因,导致僵尸节点
.withProtection()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator-node", "some-data".getBytes());
curatorFramework.close();
ZooKeeper的命名服务主要是利用ZooKeeper节点的树形分层结构和子节点的顺序维护能力,来为分布式系统中的资源命名
为分布式系统中各种API接口服务的名称、链接地址,提供类似JNDI(Java命名和目录接口)中的文件系统的功能。借助于ZooKeeper的树形分层结构就能提供分布式的API调用功能。
著名的Dubbo分布式框架就是应用了ZooKeeper的分布式的JNDI功能。在Dubbo中,使用ZooKeeper维护的全局服务接口API的地址列表。大致的思路为:
一个分布式系统通常会由很多的节点组成,节点的数量不是固定的,而是不断动态变化的。比如说,当业务不断膨胀和流量洪峰到来时,大量的节点可能会动态加入到集群中。而一旦流量洪峰过去了,就需要下线大量的节点。再比如说,由于机器或者网络的原因,一些节点会主动离开集群。
如何为大量的动态节点命名呢?一种简单的办法是可以通过配置文件,手动为每一个节点命名。但是,如果节点数据量太大,或者说变动频繁,手动命名则是不现实的,这就需要用到分布式节点的命名服务。
可用于生成集群节点的编号的方案:
在第2种方案中,集群节点命名服务的基本流程是:
在分布式系统中,分布式ID生成器的使用场景非常之多:
传统的数据库自增主键已经不能满足需求。在分布式系统环境中,迫切需要一种全新的唯一ID系统,这种系统需要满足以下需求:
有哪些分布式的ID生成器方案呢?大致如下:
_id
字段值,它是一个12字节的字符串,可以作为分布式系统中全局唯一的ID在ZooKeeper节点的四种类型中,其中有以下两种类型具备自动编号的能力
ZooKeeper的每一个节点都会为它的第一级子节点维护一份顺序编号,会记录每个子节点创建的先后顺序,这个顺序编号是分布式同步的,也是全局唯一的。
可以通过创建ZooKeeper的临时顺序节点的方法,生成全局唯一的ID
Twitter(推特)的SnowFlake算法是一种著名的分布式服务器用户ID生成算法。SnowFlake算法所生成的ID是一个64bit的长整型数字,如图10-2所示。这个64bit被划分成四个部分,其中后面三个部分分别表示时间戳、工作机器ID、序列号。
SnowFlakeID的四个部分,具体介绍如下:
在工作节点达到1024顶配的场景下,SnowFlake算法在同一毫秒最多可以生成的ID数量为: 1024 * 4096 =4194304,在绝大多数并发场景下都是够用的。
SnowFlake算法的优点:
SnowFlake算法的缺点:
什么是分布式锁
在单体的应用开发场景中涉及并发同步的时候,大家往往采用Synchronized(同步)或者其他同一个JVM内Lock机制来解决多线程间的同步问题。在分布式集群工作的开发场景中,就需要一种更加高级的锁机制来处理跨机器的进程之间的数据同步问题,这种跨机器的锁就是分布式锁。
目前分布式锁,比较成熟、主流的方案:
高可用问题
)数据库性能问题
)羊群效应问题
)频繁的创建和删除节点性能问题
)在高性能、高并发的应用场景下,不建议使用ZooKeeper的分布式锁。而由于ZooKeeper的高可用性,因此在并发量不是太高的应用场景中,还是推荐使用ZooKeeper的分布式锁。
服务注册和服务发现 CP
)基于 ZooKeeper 本身的特性可以实现注册中心
https://spring.io/projects/spring-cloud-zookeeper#learn
注意: springboot和springcloud的版本兼容问题
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
注意: zookeeper客户端依赖和zookeeper sever的版本兼容问题
<!-- zookeeper服务注册与发现 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- zookeeper client -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.0</version>
</dependency>
Spring Cloud整合Zookeeper注册中心核心源码入口: ZookeeperDiscoveryClientConfiguration
注意:如果address有问题,会出现找不到服务的情况,可以通过instance-host配置指定
spring:
cloud:
zookeeper:
connect-string: localhost:2181
discovery:
instance-host: 127.0.0.1
@Configuration
public class RestConfig {
@Bean
@LoadBalanced //客户端负载均衡
public RestTemplate restTemplate() {
return new RestTemplate();
}
}
@Autowired
private RestTemplate restTemplate;
String url = "http://mall-order/order/findOrderByUserId/"+id;
R result = restTemplate.getForObject(url,R.class);
@FeignClient(value = "open-api-service-rabbitmq", fallbackFactory = RabbitMQSenderFeignFallback.class)
public interface RabbitMQSenderFeign {
@PostMapping("/sender/syncOrder")
boolean syncOrder(@RequestBody SyncOrder syncOrder);
....
}
@Autowired
OrderFeignService orderFeignService;
//feign调用封装 ribbon调用 rpc调用 http
R result = orderFeignService.findOrderByUserId(id);
org.springframework.cloud.client.discovery.DiscoveryClient
org.springframework.cloud.client.serviceregistry.ServiceRegistry
org.springframework.boot.autoconfigure.AutoConfigureBefore
spring-cloud 服务注册接口
package org.springframework.cloud.client.serviceregistry;
public interface ServiceRegistry<R extends Registration> {
void register(R registration);
void deregister(R registration);
void close();
void setStatus(R registration, String status);
<T> T getStatus(R registration);
}
spring-cloud 和 spring-boot (https://spring.io/projects/spring-cloud)
spring-cloud-alibaba (https://github.com/alibaba/spring-cloud-alibaba/wiki)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。