当前位置:   article > 正文

Zookeeper_zkcli.sh

zkcli.sh

一.安装

1.安装地址可见zookeeper官网:Apache ZooKeeper

2.本地安装:

        注意:(1)我们这里使用的是sunlight,sunlight2,sunlight3

                   (2)安装前请确保三台服务器有jdk环境,我们这里的jdk版本是1.8.0

(0)linux下jdk1.8的安装方法:

此链接可参考:        Linux 下安装JDK1.8 - 法号阿兴 - 博客园

        a.oracle下找到对应的jdk在linux下的安装包并下载或找本地资源

        b.传输到linux中并解压

        c.配置环境变量

                i.vim /etc/profile       

                ii.在此文件最底部进行如下图的三行配置!

                iii.保存退出:wq

                iiii.更新配置:source /etc/profile

        d.测试:java -version

        

如此一来,jdk在linux下的环境就成功了!

 (1)安装zookeeper:

        a.找到对应版本的zookeeper传输到linux

        b.解压

        c.由于名字有些长,为了方便,改一下名字:

 (2)配置修改:

        a. /opt/software/zookeeper-3.5.7/conf下的zoo_sample.cfg就是zookeeper的配置文件,zoo_sample.cfg是带有样例的意思,我们要把它改为zoo.cfg用于配置zookeeper信息

         b.初步查看zookeeper的配置信息:vim zoo.cfg

配置信息如下,未截屏的都是注释!

 由上图可知:默认情况下zookeeper的存储数据都默认保存在/tmp/zookeeper下。我们有知道linux下/tmp目录的存储信息是临时数据,隔一个月左右自动删除,因此我们这里要做修改!自己指定zookeeper数据的存储目录

         c.自己指定zookeeper数据的存储目录:

                 i.创建目录存储zookeeper信息,我们一般把那个目录叫做zkData

                        注意:这个目录并非一定要在zookeeper下,这是为了方便我这样做。

                 ii.修改zookeeper配置:vim zoo.cfg

注意:集群状况或启动tomcat的情况下。zookeeper部署后, 3.5以后的版本, 会自动占用8080端口. 需要修改配置文件更改服务器端口。否则zk服务器启动不起来。

        vim zoo.cfg

 (3)操作zookeeper--启动测试

         a.启动zookeeper服务端:

                

        b.查看进程是否启动:

         c.查看状态:

         d.启动客户端:

./zkCli.sh:开启本服务器的客户端

./zkCli.sh -server 192.168.23.150:开启其他服务器的客户端:2181

         e.简单使用:

         f.关闭客户端:

         g.关闭服务器:

 (4)zookeeper下conf下zoo.cfg配置信息的参数详解:

        Zookeeper中的配置文件 zoo.cfg 中参数含义解读如下:
        

                a.tickTime = 2000:通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒:

                 b.initLimit = 10LF初始通信时限:

Leader Follower 初始连接 时能容忍的最多心跳数( tickTime 的数量)

                 c.syncLimit = 5LF同步通信时限

Leader Follower 之间通信时间如果超过 syncLimit * tickTime Leader 认为 Follwer
掉,从服务器列表中删除 Follwer

                 d.dataDir保存Zookeeper中的数据

        注意:默认的tmp目录,容易被Linux系统定期删除,所以一般不用默认的tmp目录。

                e.clientPort = 2181:客户端连接端口,通常不做修改。

二.Zookeeper集群操作

        我们这里使用的是sunlight,sunlight2,sunlight3,在安装-本地安装的基础上进行配置集群。

1.集群安装

(0)我们这里的集群安装是一台服务器一台服务器操作的。有空学习集群脚本操作,haddoop相关

(1)我们这里使用的是sunlight,sunlight2,sunlight3,在安装-本地安装的基础上进行配置集群。

(2)配置服务器编号:

        在/opt/software/zookeeper-3.5.7/zkkData下vim myid:

注意:上下不要有空行,左右不要有空格、
注意:添加 myid 文件,一定要在 Linux 里面创建,在 notepad++ 里面很可能乱码

sunlight为10, sunlight2为2,sunlight3为3

(3)配置zoo.cfg文件(必须的,否则会出错)

注意:若使用主机名替代IP地址,要进行如下配置:vim /etc/hosts

        a.打开 zoo.cfg 文件

        b.vim zoo.cfg,在文件底部添加:三天服务器此配置一模一样

使用上面的较好。后面的2181是默认的clientPort = 2181配置

  1. server.2=192.168.23.150:2888:3888
  2. server.3=192.168.23.160:2888:3888
  3. server.10=192.168.23.130:2888:3888
A 是一个数字,表示这个是第几号服务器;
集群模式下配置一个文件 myid ,这个文件在 dataDir 目录下,这个文件里面有一个数据
就是 A 的值, Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比
较从而判断到底是哪个 server
B 是这个服务器的地址;
C 是这个服务器 Follower 与集群中的 Leader 服务器交换信息的端口;
D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的
Leader ,而这个端口就是用来执行选举时服务器相互通信的端口。

(4)测试集群

a.三台服务器分别启动:

cd /opt/software/zookeeper/bin

./zkServer.sh start

b.查看状态:

./zkServer.sh status

 

 

2.选举机制(面试重点):

(1)第一次启动时的选举

(2)非第一次启动的选举

 leade相当于皇上,皇上宕机选新leaderr,epoch是几朝元老的意思

3.zookeeper集群启动/停止脚本

我们这里的集群安装是一台服务器一台服务器操作的。有空学习集群脚本操作,haddoop相关

脚本如下:

        (1)里面是默认配置了免密登录的。若添加如下脚本后使用失败,考虑免密登录是否配置。

        (2)脚本我们一般放在用户的家目录下:

         (3)在上述目录下,编辑一个.sh结尾的文件用于存放zookeeper启动/停止的脚本

        (4)并给予权限,让它可执行:chmod 777 zk.sh

  1. #!/bin/bash
  2. case $1 in
  3. "start"){
  4. for i in 192.168.23.130 192.168.23.150 192.168.23.160
  5. do
  6. echo ---------- zookeeper $i 启动 ------------
  7. ssh $i "/opt/software/zookeeper-3.5.7/bin/zkServer.sh start"
  8. done
  9. };;
  10. "stop"){
  11. for i in 192.168.23.130 192.168.23.150 192.168.23.160
  12. do
  13. echo ---------- zookeeper $i 停止 ------------
  14. ssh $i "/opt/software/zookeeper-3.5.7/bin/zkServer.sh stop"
  15. done
  16. };;
  17. "status"){
  18. for i in 192.168.23.130 192.168.23.150 192.168.23.160
  19. do
  20. echo ---------- zookeeper $i 状态 ------------
  21. ssh $i "/opt/software/zookeeper-3.5.7/bin/zkServer.sh status"
  22. done
  23. };;
  24. esac

上面的有方法不在上述代码里配置免密登录。

下面是在代码里配置了免密登录的:

        前提是安装了:yum install sshpass

        解释:

        a.-o StrictHostKeyChecking=no是跳过密码提示的,不加会没反应

        b.sshpass -p 201315的201315即为密码

  1. #!/bin/bash
  2. case $1 in
  3. "start"){
  4. for i in 192.168.23.130 192.168.23.150 192.168.23.160
  5. do
  6. echo ---------- zookeeper $i 启动 ------------
  7. sshpass -p 201315 ssh -o StrictHostKeyChecking=no $i "/opt/software/zookeeper-3.5.7/bin/zkServer.sh start"
  8. done
  9. };;
  10. "stop"){
  11. for i in 192.168.23.130 192.168.23.150 192.168.23.160
  12. do
  13. echo ---------- zookeeper $i 停止 ------------
  14. sshpass -p 201315 ssh -o StrictHostKeyChecking=no $i "/opt/software/zookeeper-3.5.7/bin/zkServer.sh stop"
  15. done
  16. };;
  17. "status"){
  18. for i in 192.168.23.130 192.168.23.150 192.168.23.160
  19. do
  20. echo ---------- zookeeper $i 状态 ------------
  21. sshpass -p 201315 ssh -o StrictHostKeyChecking=no $i "/opt/software/zookeeper-3.5.7/bin/zkServer.sh status"
  22. done
  23. };;
  24. esac

        上述配置之后,zookeeper服务器的启动在三台服务器上就可以集群操作了,zookeeper客户端的启动未配置集群操作,需要自己手动开启(./zkCli.sh)。

4.客户端命令行操作

        (1)命令行语法

                a.连接客户端后显示所有操作指令:help 

 

        (2)znode节点数据信息

                a.查看当前znode中所包含的内容

        

                 b.查看当前节点详细数据

                 

        (3)节点类型

                节点类型:

                 a.分别创建2个普通节点(永久节点 + 不带序号)

                        i.创建1级数据

                         ii.创建2级数据

                b.获得节点的值

 

            c.创建带序号的节点(永久节点 + 带序号)

                    i.先创建一个普通的根节点/sanguo/weiguo

                    ii.创建带序号的永久节点

 

 对比上图和下图:可发现带序号的节点,数据相同也可以重复创建,不带序号的不能重复创建

        d.创建短暂节点(短暂节点+不带序号/带序号)

                i.创建短暂的不带序号的节点

                ii.创建短暂的带序号的节点

       

                iii.在当前客户端是能查看到的

                iiii.退出当前客户端然后再重启客户端,发现之前创建的        /sanguo/wuguo        中的数据没了

         e.修改节点数据值

                i.修改前的值:

                 ii修改操作及修改后的值

        (4)监听器原理:

        客户端注册监听它关心的目录节点,当目录节点发生变化(数据改变、节点删除、子目
录节点增加删除)时, ZooKeeper 会通知客户端。监听机制保证 ZooKeeper 保存的任何的数
据的任何改变都能快速的响应到监听了该节点的应用程序。

                 a.节点的值变化监听

                        get -w:注册监听器

                        i.在sunlight3中监听了 /sanguo 里的值:

                         ii.sunlight2中修改了 /sanguo 里的值:

        

                        iii.此时我们再看sunlight3里的反应:

注意:在 sunlight2 再多次修改 /sanguo 的值, sunlight3 上不会再收到监听。因为注册
一次,只能监听一次。想再次监听,需要再次注册。

        

                b.节点的子节点变化监听(路径变化)

                        ls -w:注册监听器

                         i.在sunlight3中监听 /sanguo 节点里的子节点的变化:

                         ii.sunlight2中创建子节点

                         iii.此时我们再看sunlight3里的反应:

 (5)节点删除与查看

        a.删除无子节点的节点:

                delete /sanguo/jin

        b.删除有子节点的节点(递归删除节点):

                deleteall /sanguo

         c.查看节点状态:

三.JavaApi操作Zookeeper

1.简单测试

        (1)导入pom依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>junit</groupId>
  4. <artifactId>junit</artifactId>
  5. <version>RELEASE</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.logging.log4j</groupId>
  9. <artifactId>log4j-core</artifactId>
  10. <version>2.8.2</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.projectlombok</groupId>
  14. <artifactId>lombok</artifactId>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.apache.zookeeper</groupId>
  18. <artifactId>zookeeper</artifactId>
  19. <version>3.5.7</version>
  20. </dependency>
  21. </dependencies>

        (2)application.properties:进行slf4j的配置

  1. log4j.rootLogger=INFO, stdout
  2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  4. log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
  5. log4j.appender.logfile=org.apache.log4j.FileAppender
  6. log4j.appender.logfile.File=target/spring.log
  7. log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
  8. log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

        (3)测试

  1. package com.example.zookeeper;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.zookeeper.WatchedEvent;
  4. import org.apache.zookeeper.Watcher;
  5. import org.apache.zookeeper.ZooKeeper;
  6. import org.junit.Test;
  7. @Slf4j
  8. public class ZkClient {
  9. // IP地址也可以写主机名称,当然要在windows的hosts文件中写上映射。
  10. // 注意:逗号前后不能有空格。
  11. private String connectString = "192.168.23.130:2181,192.168.23.150:2181,192.168.23.160:2181";
  12. private int sessionTimeout = 2000;
  13. @Test
  14. public void init() throws Exception{
  15. ZooKeeper zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  16. @Override
  17. public void process(WatchedEvent watchedEvent) {
  18. }
  19. });
  20. }
  21. }

        (4)测试结果如下:

2.创建子节点,在1的基础上进行:

(1)执行类中的create()方法

  1. package com.example.zookeeper;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.zookeeper.*;
  4. import org.junit.Before;
  5. import org.junit.Test;
  6. @Slf4j
  7. public class ZkClient {
  8. // IP地址也可以写主机名称,当然要在windows的hosts文件中写上映射。
  9. // 注意:逗号前后不能有空格。
  10. private String connectString = "192.168.23.130:2181,192.168.23.150:2181,192.168.23.160:2181";
  11. private int sessionTimeout = 2000;
  12. private ZooKeeper zkClient;
  13. // 初始化--测试连接。测试之后为了配合下面的使用,
  14. // 把@Test注释掉,新增注释@Before,表示在任何方法执行前执行此方法,进行zookeeper的初始化
  15. // @Test
  16. @Before
  17. public void init() throws Exception{
  18. zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  19. @Override
  20. public void process(WatchedEvent watchedEvent) {
  21. }
  22. });
  23. }
  24. // 创建子节点
  25. @Test
  26. public void create() throws KeeperException, InterruptedException {
  27. String nodeCreated =
  28. zkClient.create("/school","classone".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  29. System.out.println("结果:"+nodeCreated);
  30. }
  31. }

(2)运行create()的结果

 (3)查看zookeeper中的信息:

3.监听节点变化:实际操作时碰到了些问题

4.判断节点是否存在:见springboot-demo

5.客户端向服务端写数据流程

四.服务器动态上下线监听案例

1.需求:

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知
到主节点服务器的上下线。

2.需求分析

 3.具体实现

(1)先在集群上创建/servers 节点

(2)服务器注册:创建包com.example.zookeeper.case1,创建类DistributeServer

  1. package com.example.zookeeper.case1;
  2. import org.apache.zookeeper.*;
  3. // 服务端向zookeeper注册
  4. public class DistributeServer {
  5. private String connectString = "192.168.23.130:2181,192.168.23.150:2181,192.168.23.160:2181";
  6. // 连接需要时间,时间设长一点
  7. private int sessionTimeout = 200000;
  8. private ZooKeeper zk;
  9. public static void main(String[] args) throws Exception {
  10. DistributeServer server = new DistributeServer();
  11. // 1.获取zk连接
  12. server.getConnect();
  13. // 2.注册服务器到zk集群
  14. String a = "192.168.23.150";
  15. server.regist(a);
  16. // 3.启动业务逻辑(睡觉)。这里业务逻辑就不写了,sleep一会代表业务逻辑的运行。
  17. server.business();
  18. }
  19. private void business() throws Exception{
  20. Thread.sleep(Long.MAX_VALUE);
  21. }
  22. private void regist(String hostip) throws Exception {
  23. String create =
  24. zk.create("/servers/" + hostip, hostip.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  25. System.out.println(hostip + " is online");
  26. }
  27. private void getConnect() throws Exception {
  28. zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  29. @Override
  30. public void process(WatchedEvent watchedEvent) {
  31. }
  32. });
  33. }
  34. }

(3)客户端监听:包com.example.zookeeper.case1下创建类DistributeClient

  1. package com.example.zookeeper.case1;
  2. import org.apache.zookeeper.WatchedEvent;
  3. import org.apache.zookeeper.Watcher;
  4. import org.apache.zookeeper.ZooKeeper;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. public class DistributeClient {
  8. private String connectString = "192.168.23.130:2181,192.168.23.150:2181,192.168.23.160:2181";
  9. // 连接需要时间,时间设长一点
  10. private int sessionTimeout = 200000;
  11. private ZooKeeper zk;
  12. public static void main(String[] args) throws Exception {
  13. DistributeClient client = new DistributeClient();
  14. // 1.获取zk连接
  15. client.getConnect();
  16. // 2.监听/servers下面子节点的增加和删除
  17. client.getServerList();
  18. // 3.启动业务逻辑(睡觉)。这里业务逻辑就不写了,sleep一会代表业务逻辑的运行。
  19. client.business();
  20. }
  21. private void business() throws InterruptedException {
  22. Thread.sleep(Long.MAX_VALUE);
  23. }
  24. public void getServerList() throws Exception {
  25. // watch:true 意味着会走
  26. // new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});方法
  27. // 监听注册只生效一次,让它一直生效,需要在getConnect()中的process(WatchedEvent watchedEvent)方法里再次调用
  28. List<String> children = zk.getChildren("/servers", true);
  29. ArrayList<String> servers = new ArrayList<>();
  30. for (String child : children) {
  31. byte[] data = zk.getData("/servers/" + child, false, null);
  32. servers.add(new String(data));
  33. }
  34. System.out.println(servers);
  35. }
  36. private void getConnect() throws Exception {
  37. zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  38. @Override
  39. public void process(WatchedEvent watchedEvent) {
  40. try {
  41. getServerList();
  42. } catch (Exception e) {
  43. e.printStackTrace();
  44. }
  45. }
  46. });
  47. }
  48. }

(4)测试

        a.启动DistributeClient并查看DistributeClient控制台

         解释:第一个[]是getServerList()方法的,第二个[]是getConnect()的process(WatchedEvent watchedEvent)功劳。同时getConnect()的process(WatchedEvent watchedEvent)方法还会在每次节点变化的时候都调用一次。

        b.在192.168.23.130上zk的客户端/servers 目录上创建临时带序号节点

         c.观察此刻idea DistributeClient控制台变化

         d.在192.168.23.130上zk的客户端/servers 目录上执行删除操作

        e.观察 Idea DistributeClient控制台变化

       上面是开启 DistributeClient在linux中操作增减服务器。

       下面是开启 DistributeServer在idea中操作增减服务器。

        f.下面启动DistributeServer并观察 Idea DistributeServer控制台变化

         g.查看Idea DistributeClient控制台变化

         h.修改内部参数,再开启一台DistributeServer并观察 Idea DistributeServer控制台变化

操作前提:允许并行。 

         i.查看Idea DistributeClient控制台变化

五.Zookeeper分布式锁案例

        什么叫做分布式锁呢?
        比如说" 进程 1" 在使用该资源的时候,会先去获得锁, " 进程 1" 获得锁以后会对该资源
保持独占,这样其他进程就无法访问该资源, " 进程 1" 用完该资源以后就将锁释放掉,让其
他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的
访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。

1.原生 Zookeeper 实现分布式锁案例

(1)分布式锁实现 :创建包com.example.zookeeper.case2,创建类DistributeLock

  1. package com.example.zookeeper.case2;
  2. import org.apache.zookeeper.*;
  3. import org.apache.zookeeper.data.Stat;
  4. import java.io.IOException;
  5. import java.util.Collections;
  6. import java.util.List;
  7. import java.util.concurrent.CountDownLatch;
  8. public class DistributeLock {
  9. private String connectString = "192.168.23.130:2181,192.168.23.150:2181,192.168.23.160:2181";
  10. // 连接需要时间,时间设长一点
  11. private int sessionTimeout = 200000;
  12. private ZooKeeper zk;
  13. private CountDownLatch connectLatch = new CountDownLatch(1);
  14. private CountDownLatch waitLatch = new CountDownLatch(1);
  15. private String waitPath;
  16. private String currentNode;
  17. public DistributeLock() throws Exception {
  18. // 获取连接
  19. zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  20. @Override
  21. public void process(WatchedEvent watchedEvent) {
  22. // connectLatch 如果连接上zk 可以释放
  23. if (watchedEvent.getState() == Event.KeeperState.SyncConnected){
  24. connectLatch.countDown();
  25. }
  26. // waitLatch 需要释放
  27. if (watchedEvent.getType()== Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){
  28. waitLatch.countDown();
  29. }
  30. }
  31. });
  32. // 等待zk正常连接后,往下走程序
  33. connectLatch.await();
  34. // 判断根节点/locks是否存在 watch:false为不开启监听
  35. Stat stat = zk.exists("/locks", false);
  36. if (stat == null) {
  37. // 创建一下根节点
  38. zk.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  39. }
  40. }
  41. // 对zk加锁
  42. public void zkLock() throws Exception{
  43. // 创建对应的临时带序号节点
  44. currentNode =
  45. zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  46. // wait一小会, 让结果更清晰一些
  47. Thread.sleep(10);
  48. // 判断创建的节点是否是最小的序号节点,如果是则获取到锁。如果不是,监听序号前一个节点。
  49. List<String> children = zk.getChildren("/locks", false);
  50. // 如果children只有一个值,那就直接获取锁,如果有多个节点,需要判断谁最小
  51. if (children.size() == 1) {
  52. return;
  53. }else {
  54. Collections.sort(children);
  55. // 获取节点名称 seq-00000000
  56. String thisNode = currentNode.substring("/locks/".length());
  57. // 通过seq-00000000获取该节点在children集合的位置
  58. int index = children.indexOf(thisNode);
  59. // 判断
  60. if (index == -1) {
  61. System.out.println("数据异常");
  62. } else if (index == 0) {
  63. // 就一个节点,可以获取锁了
  64. return;
  65. } else {
  66. // 需要监听 他前一个节点变化
  67. waitPath = "/locks/" + children.get(index - 1);
  68. zk.getData(waitPath,true,new Stat());
  69. // 等待监听
  70. waitLatch.await();
  71. return;
  72. }
  73. }
  74. }
  75. // 解锁
  76. public void unZkLock() throws Exception {
  77. // 删除节点
  78. zk.delete(this.currentNode,-1);
  79. }
  80. }

(1)分布式锁测试 :在包com.example.zookeeper.case2下,创建类DistributedLockTest

  1. package com.example.zookeeper.case2;
  2. import org.apache.zookeeper.KeeperException;
  3. import java.io.IOException;
  4. public class DistributedLockTest {
  5. public static void main(String[] args) throws Exception {
  6. final DistributeLock lock1 = new DistributeLock();
  7. final DistributeLock lock2 = new DistributeLock();
  8. new Thread(new Runnable() {
  9. @Override
  10. public void run() {
  11. try {
  12. lock1.zkLock();
  13. System.out.println("线程1 启动,获取到锁");
  14. Thread.sleep(5 * 1000);
  15. lock1.unZkLock();
  16. System.out.println("线程1 释放锁");
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. }).start();
  22. new Thread(new Runnable() {
  23. @Override
  24. public void run() {
  25. try {
  26. lock2.zkLock();
  27. System.out.println("线程2 启动,获取到锁");
  28. Thread.sleep(5 * 1000);
  29. lock2.unZkLock();
  30. System.out.println("线程2 释放锁");
  31. } catch (Exception e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. }).start();
  36. }
  37. }

(3)观察控制台变化:

2.Curator 框架实现分布式锁案例

(1)原生的 Java API 开发存在的问题

        a.会话连接是异步的,需要自己去处理。比如使用 CountDownLatch

        b.Watch 需要重复注册,不然就不能生效
        c.开发的复杂性还是比较高的
        d.不支持多节点删除和创建。需要自己去递归
(2) Curator 是一个专门解决分布式锁的框架,解决了原生 JavaAPI 开发分布式遇到的问题。

详情见官网:Apache Curator –

(3)Curator 案例实操

        a.添加依赖

  1. <dependency>
  2. <groupId>org.apache.curator</groupId>
  3. <artifactId>curator-framework</artifactId>
  4. <version>4.3.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.curator</groupId>
  8. <artifactId>curator-recipes</artifactId>
  9. <version>4.3.0</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.curator</groupId>
  13. <artifactId>curator-client</artifactId>
  14. <version>4.3.0</version>
  15. </dependency>

        b.代码实现:

  1. package com.example.zookeeper.case3;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.framework.recipes.locks.InterProcessMutex;
  5. import org.apache.curator.retry.ExponentialBackoffRetry;
  6. public class CuratorLockTest {
  7. public static void main(String[] args) {
  8. // 创建分布式锁1
  9. InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
  10. // 创建分布式锁2
  11. InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
  12. new Thread(new Runnable() {
  13. @Override
  14. public void run() {
  15. try {
  16. lock1.acquire();
  17. System.out.println("线程1 获取到锁");
  18. lock1.acquire();
  19. System.out.println("线程1 再次获取到锁");
  20. Thread.sleep(5 * 1000);
  21. lock1.release();
  22. System.out.println("线程1 释放锁");
  23. lock1.release();
  24. System.out.println("线程1 再次释放锁");
  25. } catch (Exception e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. }).start();
  30. new Thread(new Runnable() {
  31. @Override
  32. public void run() {
  33. try {
  34. lock2.acquire();
  35. System.out.println("线程2 获取到锁");
  36. lock2.acquire();
  37. System.out.println("线程2 再次获取到锁");
  38. Thread.sleep(5 * 1000);
  39. lock2.release();
  40. System.out.println("线程2 释放锁");
  41. lock2.release();
  42. System.out.println("线程2 再次释放锁");
  43. } catch (Exception e) {
  44. e.printStackTrace();
  45. }
  46. }
  47. }).start();
  48. }
  49. private static CuratorFramework getCuratorFramework() {
  50. ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
  51. CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.23.130:2181,192.168.23.150:2181,192.168.23.160:2181")
  52. .connectionTimeoutMs(200000)
  53. .sessionTimeoutMs(200000)
  54. .retryPolicy(policy).build();
  55. // 启动客户端
  56. client.start();
  57. System.out.println("zookeeper 启动成功");
  58. return client;
  59. }
  60. }

        c.观察控制台

六.源码分析

1.CAP理论:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/302343
推荐阅读
相关标签
  

闽ICP备14008679号