赞
踩
创建ZK节点
- 默认情况下,不添加-s或者-e参数的,创建的是持久节点。
# 格式:【参数:-s:顺序节点;-e:临时节点】
create [-s] [-e] path data acl
# 实例
create /zk-book java
读取节点信息ls命令和set命令。
# 格式
ls path [watch]
# 实例
ls /
使用get命令,可以获取zookeeper指定节点的数据内容和属性信息。
# 格式
get path [watch]
# 实例
get /zk-book
使用set命令,可以更新指定节点的数据内容。
# 格式
set path data [version]
# 实例
set /zk-book springcloud
删除zookeeper上的指定节点。
# 格式
delete path [version]
# 实例
delete /zk-book
每个节点都有属于自己的状态信息,这就很像每个人的身份信息一样。
# 格式
stat path
# 实例
stat /a
状态属性 | 说明 |
---|---|
czxid | 表示该数据节点被创建时的事务 ID |
mzxid | 表示该数据节点最后一次被更新时的事务 ID |
pzxid | 表示该数据节点的子节点列表最后一次被更新时的事务 ID |
ctime | 表示该节点的创建时间 |
mtime | 表示该节点最后一次被更新的时间 |
version | 数据节点的版本号 |
cversion | 子节点的版本号 |
aversion | 节点的ACL版本号 |
ephemeralOwner | 创建该临时节点的会话 SessionID 如果该节点是持久节点,那么这个属性值是 0 |
dataLength | 数据内容的长度 |
numChildren | 当前节点的子节点个数 |
zooKeeper 提供了分布式数据的发布/订阅功能。一个典型的发布/订阅模型系统定义了一种一对多的订阅关系,能够让多个订阅者同时监听某一个主题对象,当这个主题对象自身状态变化时,会通知所有订阅者,使它们能够做出相应的处理。
# 格式
ls -w path
# 实例
ls -w /zk-book
# 格式
get -w path
# 实例
get -w /zk-book
在ZooKeeper的实际使用中,我们的做法往往是搭建一个共用的ZooKeeper集群,统一为若干个应用提供服务。在这种情况下,不同的应用之间往往是不会存在共享数据的使用场景的,因此需要解决不同应用之间的权限问题。
# 格式
setAcl path schema:id:permission
# 实例
setAcl /test2 ip:128.0.0.1:crwda
ZooKeeper内置了一些权限控制方案(schema),可以用以下方案为每个节点设置权限:
方案 | 描述 |
---|---|
world | 只有一个用户:anyone,代表所有人(默认) |
ip | 使用IP地址认证 |
auth | 使用已添加认证的用户认证 |
digest | 使用“用户名:密码”方式认证 |
授权对象ID是指,权限赋予的用户或者一个实体,例如:IP 地址或者机器。授权模式 schema 与 授权对象 ID 之间关系:
权限模式 | 授权对象 |
---|---|
IP | 通常是一个IP地址或是IP段,例如“192.168.66.101” |
Digest | 自定义,通常是“username:BASE64(SHA-1(username:password))” |
World | 只有一个ID:“anyone” |
Super | 与Digest模式一致 |
权限 | ACL简写 | 描述 |
---|---|---|
CREATE | c | 可以创建子节点 |
DELETE | d | 可以删除子节点(仅下一级节点) |
READ | r | 可以读取节点数据及显示子节点列表 |
WRITE | w | 可以设置节点数据 |
ADMIN | a | 可以设置节点访问控制列表权限 |
命令 | 使用方式 | 描述 |
---|---|---|
getAcl | getAcl | 读取ACL权限 |
setAcl | setAcl | 设置ACL权限 |
addauth | addauth | 添加认证用户 |
之前使用stat命令来验证ZooKeeper服务器是否启动成功,这里的stat命令就是ZooKeeper 中最为典型的命令之一。ZooKeeper中有很多类似的命令,它们的长度通常都是4个英文字母,因此我们称之为“四字命令”。
vim /usr/local/zookeeper/conf/zoo.cfg
4lw.commands.whitelist=*
输出Zookeeper相关服务的详细配置信息,如客户端端口,数据存储路径、最大连接数、日志路径、数据同步端口、主节点推举端口、session超时时间等等。
echo conf| nc localhost 2181
cons 命令用于输出当前这台服务器上所有客户端连接的详细信息,包括每个客户端的客户端IP、会话ID和最后一次与服务器交互的操作类型等。
echo cons | nc localhost 218
ruok命令用于输出当前ZooKeeper服务器是否正在运行。该命令的名字非常有趣,其谐音正好是“Are you ok”。执行该命令后,如果当前ZooKeeper服务器正在运行,那么返回“imok”, 否则没有任何响应输出。
echo ruok | nc localhost 2181
stat命令用于获取ZooKeeper服务器的运行时状态信息,包括基本的ZooKeeper版本、打包信息、运行时角色、集群数据节点个数等信息,另外还会将当前服务器的客户端连接信息打印出来。
echo stat | nc localhost 2181
列出集群的关键性能数据,包括zk的版本、最大/平均/最小延迟数、数据包接收/发送量、连接数、zk角色(Leader/Follower)、node数量、watch数量、临时节点数。
echo mntr | nc localhost 2181
zookeeper原生api
- 重复注册watcher
- session失效重连
- 异常处理(删除节点不能有子节点,新增节点必须有父节点等)
<dependencies>
<!-- zookeeper原生api -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
</dependencies>
获取zookeeper链接
public static void main(String[] args) throws IOException {
/**
* 创建一个 Zookeeper 的实例
* 此处为一个集群,Zookeeper 的 ip 之间用逗号隔开
* 参数解释:
* param 1 - Zookeeper 的实例 ip ,此处是一个集群,所以配置了多个 ip,用逗号隔开
* param 2 - session 过期时间,单位秒(1000)
* param 3 - 监视者,用于获取监听事件(MyWatch)
*/
ZooKeeper zooKeeper = new ZooKeeper("ip:2181", 5000, null);
// 1. 查看链接状态
System.out.println(zooKeeper.getState());
}
类型 | 含义 |
---|---|
PERSISTENT | 持久化目录节点,客户端与zookeeper断开连接后,该节点依旧存在 |
PERSISTENT_SEQUENTIAL | 持久化,并带有序列号 |
EPHEMERAL | 临时目录节点,客户端与zookeeper断开连接后,该节点被删除 |
EPHEMERAL_SEQUENTIAL | 临时,并带有序列号 |
/**
* 创建节点
*/
private static void createNodeSync(ZooKeeper zooKeeper) throws KeeperException, InterruptedException {
String path = "/node1";
/**
* 参数一:znode名称
* 参数二:节点数据
* 参数三:设置权限
* 参数四:znode类型
*/
String nodePath = zooKeeper.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(nodePath);
}
/** * 读取数据信息 * 1. 节点数据 * 2. 节点数据状态信息 */ private static void getDataSync(ZooKeeper zooKeeper) throws KeeperException, InterruptedException { Stat stat = new Stat(); /** * getData的返回值是该节点的数据值,节点的状态信息会赋值给stat对象 * 参数一:节点名称 * 参数二:监听机制(null不触发) * 参数三:状态信息 */ byte[] data = zooKeeper.getData("/node1", true, stat); System.out.println(new String(data)); System.out.println(stat); }
/**
* 更新节点数据
*/
private static void setDataSync(ZooKeeper zooKeeper) throws InterruptedException, KeeperException {
/**
* 参数一:znode名称
* 参数二:节点数据
* 参数三:该节点的版本,(-1代表全部)
*/
Stat stat = zooKeeper.setData("/node1", "java-zookeeper".getBytes(), -1);
System.out.println(stat);
}
/**
* 删除节点
*/
private static void deleteSync(ZooKeeper zooKeeper) throws KeeperException, InterruptedException {
/**
* 参数一:节点名称
* 参数二:节点版本
*/
zooKeeper.delete("/node1", -1);
}
/**
* 查看节点是否存在
*/
private static void existSync(ZooKeeper zooKeeper) throws KeeperException, InterruptedException {
/**
* 参数一:节点名称
* 参数二:监听机制
*/
Stat stat = zooKeeper.exists("/node1", true);
System.out.println(stat);
}
通过zkCli.getchildren(“/”,new watch()){}来注册监听,监听的是整个根节点,但是这个监听只能监听一次。线程休眠是为了让监听等待事件发生,不然会随着程序直接运行完。
/** * 监听节点 */ private static void watcherNodeSync(ZooKeeper zooKeeper) throws InterruptedException, KeeperException { /** * 监听目录节点(只监听一次) * 参数一:监听路径 * 参数二:回调函数 */ zooKeeper.getChildren("/", new Watcher() { public void process(WatchedEvent event) { System.out.println("监听路径为:" + event.getPath()); System.out.println("监听的类型为:" + event.getType()); System.out.println("数据被2货修改了!!!"); } }); Thread.sleep(Long.MAX_VALUE); }
getData监听的为一个节点,同样只监听一次,返回的是该节点的内容。
/** * 监听数据 */ private static void watcherDataSync(ZooKeeper zooKeeper) throws InterruptedException, KeeperException { /** * 监听目录节点(只监听一次) * 参数一:监听路径 * 参数二:回调函数 */ byte[] data = zooKeeper.getData("/node1", new Watcher() { //监听的具体内容 public void process(WatchedEvent event) { System.out.println("监听路径为:" + event.getPath()); System.out.println("监听的类型为:" + event.getType()); System.out.println("数据被2货修改了!!!"); } }, null); System.out.println(new String(data)); Thread.sleep(Long.MAX_VALUE); }
zkclient是Github上一个开源的Zookeeper客户端,在Zookeeper原生 API接口之上进行了包装,是一个更加易用的Zookeeper客户端。同时Zkclient在内部实现了诸如Session超时重连,Watcher反复注册等功能,从而提高开发效率。
<dependencies>
<!-- zkclient -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
</dependencies>
public static void main(String[] args) {
/**
* 创建会话,多个地址用逗号隔开
* 参数:zookeeper地址端口
*/
ZkClient zk = new ZkClient("ip:2181");
// 1. 查看连接状态
System.out.println(zk.getShutdownTrigger());
}
类型 | 含义 |
---|---|
PERSISTENT | 持久化目录节点,客户端与zookeeper断开连接后,该节点依旧存在 |
PERSISTENT_SEQUENTIAL | 持久化,并带有序列号 |
EPHEMERAL | 临时目录节点,客户端与zookeeper断开连接后,该节点被删除 |
EPHEMERAL_SEQUENTIAL | 临时,并带有序列号 |
/**
* 创建节点
*/
private static void createNodeSync(ZkClient zk) throws KeeperException, InterruptedException {
/**
* 参数一:节点名称
* 参数二:节点数据
* 参数三:
*/
String res = zk.create("/root", "js", CreateMode.PERSISTENT);
}
/**
* 获取节点数据
*/
private static void getDataSync(ZkClient zk) throws InterruptedException, KeeperException {
/**
* 参数一:znode名称
*/
String res = zk.readData("/root");
System.out.println(res);
}
/**
* 修改节点数据
*/
private static void setDataSync(ZkClient zk) throws KeeperException, InterruptedException {
/**
* 参数一:节点名称
* 参数二:节点数据
*/
zk.writeData("/root", "javascript");
}
/**
* 删除节点
*/
private static void deleteSync(ZkClient zk) throws KeeperException, InterruptedException {
/**
* 参数一:节点名称
*/
zk.delete("/root");
}
/**
* 获取所有节点
*/
private static void getNodesSync(ZkClient zk) throws KeeperException, InterruptedException {
/**
* 参数一:节点名称
*/
List<String> childrens = zk.getChildren("/");
childrens.forEach(System.out::println);
}
/** * 监听节点 */ private static void watcherNodeSync(ZkClient zk) throws InterruptedException, KeeperException { /** * 参数一:监听节点信息 * 参数二:回调函数 */ zk.subscribeChildChanges("/", (arg0, arg1) -> { System.err.println("子节点发生变化:" + arg0); arg1.forEach(f -> { System.out.println("content:" + f); }); }); Thread.sleep(Long.MAX_VALUE); }
/** * 监听数据 */ private static void watcherDataSync(ZkClient zk) throws InterruptedException, KeeperException { /** * 参数一:监听节点名 * 参数二:回调函数 */ zk.subscribeDataChanges("/root", new IZkDataListener() { @Override public void handleDataDeleted(String arg0) throws Exception { System.err.println("数据删除:" + arg0); } @Override public void handleDataChange(String arg0, Object arg1) throws Exception { System.err.println("数据修改:" + arg0 + "------" + arg1); } }); Thread.sleep(Long.MAX_VALUE); }
Curator是 Netflix公司开源的一套ZooKeeper客户端框架。和ZkClient一样,Curator解决了很多ZooKeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等,目前已经成为了Apache的顶级项目,是全世界范围内使用最广泛的ZooKeeper客户端之一。
<dependencies>
<!-- curator-recipes -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
</dependencies>
public static void main(String[] args) {
/**
* 参数一:zookeeper 地址,多个用逗号隔开
* 参数二:超时时间
* 参数三:重试机制
*/
CuratorFramework cur = CuratorFrameworkFactory.builder()
.connectString("ip:2181")
.connectionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
cur.start(); //连接zookeeper
}
类型 | 含义 |
---|---|
PERSISTENT | 持久化目录节点,客户端与zookeeper断开连接后,该节点依旧存在 |
PERSISTENT_SEQUENTIAL | 持久化,并带有序列号 |
EPHEMERAL | 临时目录节点,客户端与zookeeper断开连接后,该节点被删除 |
EPHEMERAL_SEQUENTIAL | 临时,并带有序列号 |
/**
* 创建节点
*/
private static void createNodeSync(CuratorFramework curatorFramework) throws Exception {
curatorFramework.create()
.withMode(CreateMode.PERSISTENT) // 节点类型:持久节点
/**
* 参数一:节点名称
* 参数二:节点数据
*/
.forPath("/root", "itxiong".getBytes());
}
读取一个节点的数据内容,同时获取到该节点的stat
/**
* 获取节点数据
*/
private static void getDataSync(CuratorFramework cur) throws Exception {
Stat stat = new Stat();
byte[] bytes = cur.getData()
.storingStatIn(stat) // 获取节点信息状态(可省略)
.forPath("/root"); // 节点名
System.out.println(new String(bytes));
System.out.println(stat);
}
/**
* 修改节点数据
*/
private static void setDataSync(CuratorFramework cur) throws Exception {
Stat stat = cur.setData()
.withVersion(10086) // 指定修改的版本(可省略)
/**
* 参数一:节点名称
* 参数二:修改数据
*/
.forPath("/root", "data".getBytes());
System.out.println(stat);
}
/**
* 删除节点
*/
private static void deleteSync(CuratorFramework cur) throws Exception {
cur.delete()
.deletingChildrenIfNeeded()// 删除一个节点,并且递归删除其所有的子节点
.withVersion(10086) // 强制指定版本进行删除
.forPath("/root");
}
/**
* 获取所有节点
*/
private static void getNodesSync(CuratorFramework cur) throws Exception {
List<String> list = cur.getChildren().forPath("/");
list.forEach(System.out::println);
}
/**
* 检查节点是否存在
*/
private static void isNodeExists(CuratorFramework cur) throws Exception {
cur.checkExists().forPath("/root");
}
/** * 监听节点数据 */ private static void watcherDataSync(CuratorFramework cur) throws Exception { /** * 参数一:zookeeper对象 * 参数二:节点名 */ NodeCache nodeCache = new NodeCache(cur, "/node1"); nodeCache.getListenable().addListener(() -> { System.out.println("被修改了。。。。。"); }); // 开启监听 nodeCache.start(); Thread.sleep(Long.MAX_VALUE); }
<dependencies> <!-- Dubbo 依赖 --> <dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo-spring-boot-starter</artifactId> <version>2.7.6</version> </dependency> <!-- zookeeper 注册中心 依赖 --> <dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo-registry-zookeeper</artifactId> <version>2.7.6</version> <!--排除日志SLF4J防止冲突--> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> </dependencies>
# 端口号 server: port: 9090 # dubbo 配置 dubbo: # 服务名配置 application: name: rpc-provider # 注册配置 registry: # 单机 address: zookeeper://ip:2181 # 集群 # address: zookeeper://0.0.0.0:2181?backup=127.0.0.1:2182,192.168.0.1:2183 # 超时时间 timeout: 50000 # 协议端口 protocol: name: dubbo port: 20880 # 注册文件位置 scan: base-packages: com.xl.service.impl
- @Service
import org.apache.dubbo.config.annotation.Service;
@Service参数 | 含义 | 实例 |
---|---|---|
timeout | 设置超时机制,超时时间(ms) | @Service(timeout = 3000) //当前服务3秒超时 |
retries | 设置超时重试机制,重试次数 | @Service(timeout = 3000,retries = 2) |
version | 设置服务版本,x.x.x | @Service(version = “2.0.0”) |
loadbalance | 设置负载均衡方式 | @Service(timeout = 3000,retries = 3,loadbalance = “roundrobin”) |
executes | 服务限流-并发控制,最大并发数 | @Service(executes = 10) |
actives | 服务限流-连接控制,最大连接 | @Service(actives= 10) |
参数 | 含义 |
---|---|
random | 随机负载均衡,随机的选择一个,默认负载均衡。 |
roundrobin | 轮询负载均衡。 |
leastactive | 最少活跃调用数,相同活跃数的随机。 |
consistenthash | 一致性哈希负载均衡,相同参数的请求总是落在同一台机器上。 |
@Service
public class OrderServiceImpl implements IOrderDubboService {
/**
* 业务内容
*/
}
- @Reference
import org.apache.dubbo.config.annotation.Reference;
@Reference参数 | 含义 | 实例 |
---|---|---|
设置超时机制,超时时间(ms) | @Reference(timeout = 2000)// 远程注入,一方设置即可 | |
version | 设置使用的服务版本,x.x.x | @Reference(version = “2.0.0”) |
loadbalance | 设置负载均衡方式 | @Reference(timeout = 2000,loadbalance = “roundrobin”) |
cluster | 集群容错 | @Reference(cluster = “failover”) |
mock | 服务降级 | @Reference(timeout = 2000,mock = “force:return null”) |
cache | 缓存机制 | @Reference(cache = “lru”) //结果缓存 |
参数 | 含义 |
---|---|
random | 随机负载均衡,随机的选择一个,默认负载均衡。 |
roundrobin | 轮询负载均衡。 |
leastactive | 最少活跃调用数,相同活跃数的随机。 |
consistenthash | 一致性哈希负载均衡,相同参数的请求总是落在同一台机器上。 |
容错模式 | 含义 |
---|---|
failover | 失败重试。默认值。一般用于读操作。 |
failfast | 快速失败,只发起一次调用,失败立即报错。通常用于写操作。 |
failsafe | 失败安全,出现异常时,直接忽略。返回一个空结果。日志不重要操作。 |
failback | 失败自动恢复,后台记录失败请求,定时重发。非常重要的操作。 |
forking | 并行调用多个服务器,只要有一个成功即返回。 |
broadcast | 广播调用所有提供者,逐个调用,任意一台报错则报错。 同步要求高的可以使用这个模式。 |
降级方式 | 含义 |
---|---|
mock=force:return null | 表示消费方对该服务的方法调用都直接返回null值,不发起远程调用。 |
mock=fail:return null | 表示消费方对该服务的方法调用在失败后,再返回null值,不抛异常。 |
缓存方式 | 意义 |
---|---|
lru | 保持最新 |
threadlocal | 当前线程缓存 |
jcache | 桥接缓存 |
@Service public class UserServiceImpl implements IUserService { //引入订单服务 @Reference IOrderDubboService iOrderService; /** * 根据用户id查询订单 * @param id 用户id * @return */ @Override public CommonResult<Order> findByUserId(Long id) { CommonResult commonResult = new CommonResult(); commonResult.setCode(200); commonResult.setMessage("查询成功"); //远程调用 CommonResult<Order> orderCommonResult = iOrderService.findByuserId(id); commonResult.setData(orderCommonResult); return commonResult; } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。