赞
踩
基于观察者模式设计的分布式服务管理框架,负责存储和管理大家都关心数据,然后接受观察者的注册,一单这些数据的这状态发生了变化,Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应
zookeeper由一个领导者,和多个跟随者组成的集群
集群中有半数节点存活,zookeeper就可以正常服务,所以zookeeper适合安装奇数台服务器
全局数据一致:每个server保存一份相同的数据副本,client无论连接到哪个server,数据一致
更新请求顺序一致,来自同一个client的更新请求按其发送顺序依次执行
数据更新原子性:一次数据更新要么成功,要么失败
实时性,在一定时间范围内,client能读到最新数据
Zookeeper数据模型的结构与Unix文件系统类似,整体可以看做是一棵树,每个节点可以被称作一个ZNode。每一个ZNode默认存储1M数据,每个ZNode都可以通过其路径唯一标识
提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、负载均衡等
统一命名服务
统一配置管理
统一集群管理
服务器节点动态上下线
负载均衡
安装地址:阿里云镜像
放在linux内,解压
tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz
将
/zookeeper/conf/zoo_example.cfg 改名为/zookeeper/conf/zoo.cfg
更改zoo.cfg配置,路径改为自己自定义的路径,避免存放为tmp文件夹
dataDir=/usr/local/zookeeper-3.7.1/zkData
./zkServer.sh start
./zkCli.sh
[root@localhost zookeeper-3.7.1]# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper-3.7.1/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: standalone
#服务器leader与客户端follower心跳时间,单位毫秒
tickTime=2000
#LF初始通信时限,初始连接最多可以承受的tickTime数量
initLimit=10
#同步通信时限,如果超过syncLimit * tickTime,leader认为Follower死掉,从服务器节点删除
syncLimit=5
#保存zookeeper内的数据
dataDir=/usr/local/zookeeper-3.7.1/zkData
#通信端口
clientPort=2181
./zkSever.sh start
./zkSever.sh status
./zkSever.sh stop
./zkSever.sh restart
/zkCli.sh -server ip:端口
./zkClie.sh quit
[zk: localhost:2181(CONNECTED) 1] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 5] create /app1 test #创建节点app1 数据为test
Created /app1
[zk: localhost:2181(CONNECTED) 6] ls /
[app1, zookeeper]
#获取节点存放的数据
[zk: localhost:2181(CONNECTED) 9] get /app1
test
[zk: localhost:2181(CONNECTED) 10] create /app2
Created /app2
[zk: localhost:2181(CONNECTED) 12] set /app2 test2
[zk: localhost:2181(CONNECTED) 15] get /app2
test2
[zk: localhost:2181(CONNECTED) 16] delete /app1
[zk: localhost:2181(CONNECTED) 17] ls /
[app2, zookeeper]
deleteall /app1
help
#临时节点 生命周期维持到本次会话结束 [zk: localhost:2181(CONNECTED) 3] create -e /app1 Created /app1 [zk: localhost:2181(CONNECTED) 4] ls / [app1, app2, zookeeper] #顺序节点 每次建立按照顺序建立 可以容纳多个 [zk: localhost:2181(CONNECTED) 5] create -s /app3 Created /app30000000003 [zk: localhost:2181(CONNECTED) 6] ls / [app1, app2, app30000000003, zookeeper] [zk: localhost:2181(CONNECTED) 7] create -s /app3 Created /app30000000004 [zk: localhost:2181(CONNECTED) 8] create -s /app3 Created /app30000000005 [zk: localhost:2181(CONNECTED) 9] ls / [app1, app2, app30000000003, app30000000004, app30000000005, zookeeper] #创建临时顺序节点 [zk: localhost:2181(CONNECTED) 10] create -es /app3 Created /app30000000006 [zk: localhost:2181(CONNECTED) 11] ls / [app1, app2, app30000000003, app30000000004, app30000000005, app30000000006, zookeeper] #关闭本次会话再打开 临时节点都没了 [zk: localhost:2181(CONNECTED) 0] ls / [app2, app30000000003, app30000000004, app30000000005, zookeeper]
[zk: localhost:2181(CONNECTED) 5] ls -s /app2
[]
cZxid = 0x9
ctime = Sat May 13 07:15:26 PDT 2023
mZxid = 0xa
mtime = Sat May 13 07:15:38 PDT 2023
pZxid = 0x9
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0
是Apache Zookeeper 的 Java客户端库
常见:
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13</version> <!-- <scope>test</scope>--> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> <version>1.18.24</version> </dependency> <!-- 显示lombok日志信息--> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.1</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency> </dependencies>
@Test public void testConnect(){ //重试策略 3s一次 重试10次 ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000,10); //第一种方式 //zkSever服务端地址和端口 多个用逗号隔开 会话超时时间(ms) 连接超时时间(ms) 重试策略 // CuratorFramework client = CuratorFrameworkFactory.newClient( // "192.168.230.1:2181", // 60 * 1000, // 15 * 1000, // retry); // client.start(); //第二种方式 CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("192.168.230.1:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retry) .namespace("zkServer") //名称空间 以后做的任何事情 会将命名的设置为根目录 .build(); client.start(); }
优化连接
CuratorFramework client = null; @Before public void connect(){ ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000,10); client = CuratorFrameworkFactory.builder() .connectString("192.168.230.1:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retry) .namespace("zkServer") //名称空间 以后做的任何事情 会将命名的设置为根目录 .build(); client.start(); } /** * 释放资源 */ @After public void close(){ if(client != null){ client.close(); } }
/** * 创建节点 */ @Test public void create() throws Exception { //1.基本创建 没有指定数据,将当前客户端ip作为数据存入 String s1 = client.create().forPath("/app1"); // /zkServer/app1 System.out.println(s1); //2.带有数据的 String s2 = client.create().forPath("/app2","data hello".getBytes()); System.out.println(s2); //3.创建临时节点 通过withMode(CreateMode.EPHEMERAL)指定类型 String s3 = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3"); System.out.println(s3); //打个断点 查看客户端 可以看见app3 如果程序意外停止 app3不会被删除 //4.创建多级节点 creatingParentsIfNeeded() 父节点不存在也可也创建 String s4 = client.create().creatingParentsIfNeeded().forPath("/app4/p1"); System.out.println(s4); }
[zk: localhost:2181(CONNECTED) 13] ls / [zookeeper] #1.基本创建 [zk: localhost:2181(CONNECTED) 14] ls / [zkServer, zookeeper] [zk: localhost:2181(CONNECTED) 15] ls /zkServer [app1] [zk: localhost:2181(CONNECTED) 16] get /zkServer/app1 192.168.230.10 #2.带有数据的 [zk: localhost:2181(CONNECTED) 17] ls / [zkServer, zookeeper] [zk: localhost:2181(CONNECTED) 18] ls /zkServer [app1, app2] [zk: localhost:2181(CONNECTED) 19] get /zkServer/app2 data hello #3.临时节点 [zk: localhost:2181(CONNECTED) 20] ls / [zkServer, zookeeper] [zk: localhost:2181(CONNECTED) 21] ls /zkServer [app1, app2] #debug还没关闭 [zk: localhost:2181(CONNECTED) 22] ls /zkServer [app1, app2, app3] # debug 关闭之后 [zk: localhost:2181(CONNECTED) 23] ls /zkServer [app1, app2] #4多级节点 [zk: localhost:2181(CONNECTED) 24] ls /zkServer [app1, app2, app4] [zk: localhost:2181(CONNECTED) 25] ls /zkServer/app4 [p1] [zk: localhost:2181(CONNECTED) 26]
/** * 查询节点 */ @Test public void query() throws Exception { //1.查询数据 byte[] bytes = client.getData().forPath("/app1"); System.out.println(new String(bytes)); //192.168.230.10 //2.查询子节点 List<String> list = client.getChildren().forPath("/"); list.forEach(System.out::println); //app1 app2 app4 //3.查询状态 Stat status = new Stat(); //将查询到的结果封装给status对象 client.getData().storingStatIn(status).forPath("/app1"); System.out.println(status); //33,33,1684034344358,1684034344358,0,0,0,0,14,0,33 懒蛋程序员 }
@Test
public void update() throws Exception {
//1.修改数据,但是可能发生并发问题
// client.setData().forPath("/app1","testSet".getBytes());
//2.CAS修改 版本号法
Stat status = new Stat();
client.getData().storingStatIn(status).forPath("/app1");
int version = status.getVersion();
System.out.println(version); //1
//根据查询出的version进行修改,如果修改的version和内部的version不匹配,将会报错
client.setData().withVersion(version).forPath("/app1","haha".getBytes());
}
[zk: localhost:2181(CONNECTED) 28] ls -s /zkServer/app1
[]
cZxid = 0x21
ctime = Sat May 13 20:19:04 PDT 2023
mZxid = 0x44
mtime = Sun May 14 00:30:25 PDT 2023
pZxid = 0x21
cversion = 0
dataVersion = 2 #此处为2,上面的为1 操作过之后会+1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
/** * 删除 * @throws Exception e */ @Test public void delete() throws Exception { //1.普通删除 client.delete().forPath("/app1"); //2.删除全部(带子节点) client.delete().deletingChildrenIfNeeded().forPath("/app4"); //3.一定成功的删除 防止网络原因没有成功删除 client.delete().guaranteed().forPath("/app3"); //4.回调 inBackground() client.delete().guaranteed().inBackground(new BackgroundCallback(){ @Override public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { System.out.println("我被删除啦"); System.out.println(curatorFramework); System.out.println(curatorEvent); } }).forPath("/app3"); }
我被删除啦
org.apache.curator.framework.imps.CuratorFrameworkImpl@1eda9cdc
CuratorEventImpl{type=DELETE, resultCode=-101, path='/app3', name='null', children=null, context=null, stat=null, data=null, watchedEvent=null, aclList=null, opResults=null}
zookeeper允许用户在指定节点上注册一些watcher,并且在一些特定事件触发的时候,zookeeper服务端将会将事件通知到感兴趣的客户端上,该机制是zookeeper实现分布式协调服务的重要特性。
根据watch机制可以实现发布订阅功能,可以让多个订阅者同时监听一个对象,当一个对象自身状态变化时,会通知所有订阅者。
zookeeper原生支持通过注册watch来进行事件监听,但是其使用并不是很方便,需要自己实现,比较繁琐
curator引入了cache来实现对zookeeper服务端事件的监听
zookeeper提供了三种watcher
/** * 给指定节点 注册监听 */ @Test public void NodeCacheTest() throws Exception { //1.创建NodeCache对象 NodeCache nodeCache = new NodeCache(client, "/app1"); //2.注册监听 nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("节点变化了"); //对之后的操作进行记录 //获取修改节点后的数据 byte[] data = nodeCache.getCurrentData().getData(); System.out.println(new String(data)); } }); //3.开启监听 true表示,开启监听,将会一次性加载缓存内的数据 nodeCache.start(true); while (true){ } }
/** * 监听某个节点的所有子节点们 只能监听儿子们 孙子们和自己不监听 */ @Test public void PathChildrenCacheTest() throws Exception { //true 是否缓存状态信息 PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true); //绑定监听 pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { System.out.println("子节点变化"); System.out.println(curatorFramework); System.out.println(pathChildrenCacheEvent); //监听子节点数据变更 PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType(); //特别对修改数据感兴趣 if(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){ //getData() 是获取孩子数据 getData().getData() 是获得具体孩子修改数据 byte[] data = pathChildrenCacheEvent.getData().getData(); System.out.println(new String(data)); } } }); pathChildrenCache.start(); while (true); }
新增节点和修改节点
[zk: localhost:2181(CONNECTED) 47] create /zkServer/app2/p2 Created /zkServer/app2/p2 #控制台输出 子节点变化 org.apache.curator.framework.imps.WatcherRemovalFacade@d013913 PathChildrenCacheEvent{type=CHILD_ADDED, data=ChildData{path='/app2/p2', stat=101,101,1684052651069,1684052651069,0,0,0,0,0,0,101 , data=null}} [zk: localhost:2181(CONNECTED) 49] set /zkServer/app2/p2 aaa #控制台输出 子节点变化 org.apache.curator.framework.imps.WatcherRemovalFacade@d013913 PathChildrenCacheEvent{type=CHILD_UPDATED, data=ChildData{path='/app2/p2', stat=101,103,1684052651069,1684052677940,1,0,0,0,3,0,101 , data=[97, 97, 97]}} aaa #因为if只对修改感兴趣
/** * 监听自己和子节点变化 */ @Test public void TreeCacheTest() throws Exception { //1.创建 TreeCache treeCache = new TreeCache(client, "/app2"); //2.注册监听 treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception { System.out.println("子节点变化"); System.out.println(curatorFramework); System.out.println(treeCacheEvent); //监听子节点数据变更 } }); //3.开启 treeCache.start(); while (true); }
除了使用mysql、redis实现分布式锁以外,现在更多的使用zookeeper实现分布式锁
**核心思想:**使用临时顺序节点,当客户端需要获取锁,则创建节点(不能有多个客户端创建同一个节点),使用完锁之后删除该节点
客户端获取锁的时候,在lock节点下创建临时顺序节点
然后获取lock下面所有子节点,客户端获取所有子节点之后,如果发现自己创建的子节点序号最小,那么就任务该客户端获取到了锁,使用完之后,将该节点删除
如果自己不是最小的,说明自己还没获取到锁,那么客户端需要找到比自己小的那个节点,同时对其注册事件监听,监听删除事件
如果发现比自己小的那个节点被删除,则客户端的watcher会收到通知,此时再判断自己创建的节点是否是lock序列最小的,如果是获取锁,不是重复第三步
Curator中有五种锁案例:
编写案例
/** * @author 我见青山多妩媚 * @date 2023/5/14 0014 17:04 * @Description TODO */ public class Ticket12306 implements Runnable{ //票数 private int tickets = 10; //分布式锁 private InterProcessMutex lock; public Ticket12306(){ ExponentialBackoffRetry retry = new ExponentialBackoffRetry(3000,10); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("192.168.230.1:2181") .sessionTimeoutMs(60 * 1000) .connectionTimeoutMs(15 * 1000) .retryPolicy(retry) .build(); client.start(); lock = new InterProcessMutex(client,"/lock"); } @Override public void run() { while (true){ //获取锁 3s内 try { lock.acquire(3, TimeUnit.SECONDS); if(tickets > 0){ System.out.println(Thread.currentThread().getName()+":"+tickets); tickets--; }else { break; } }catch (Exception e){ e.printStackTrace(); } finally { try { //释放锁 lock.release(); } catch (Exception e) { e.printStackTrace(); } } } } }
/** * @author 我见青山多妩媚 * @date 2023/5/14 0014 16:42 * @Description TODO */ public class LockTest { public static void main(String[] args) { Ticket12306 ticket = new Ticket12306(); //创建客户端 Thread t1 = new Thread(ticket, "携程"); Thread t2 = new Thread(ticket, "飞猪"); t1.start(); t2.start(); } }
运行没问题,我们看客户端
[zk: localhost:2181(CONNECTED) 53] ls /
[lock, zkServer, zookeeper]
[zk: localhost:2181(CONNECTED) 54] ls /lock
[_c_780e8659-46bd-44fa-ae1a-c6d63df66a76-lock-0000001543, _c_a1d87292-26c3-452c-b02b-f9c4cf19711e-lock-0000001542]
这里142肯定比143先获取锁,因为他小
长时间不用时,lock节点将会被自动删除
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。