赞
踩
注意:(1)我们这里使用的是sunlight,sunlight2,sunlight3
(2)安装前请确保三台服务器有jdk环境,我们这里的jdk版本是1.8.0
(0)linux下jdk1.8的安装方法:
此链接可参考: Linux 下安装JDK1.8 - 法号阿兴 - 博客园
a.oracle下找到对应的jdk在linux下的安装包并下载或找本地资源
b.传输到linux中并解压
c.配置环境变量
i.vim /etc/profile
ii.在此文件最底部进行如下图的三行配置!
iii.保存退出:wq
iiii.更新配置:source /etc/profile
d.测试:java -version
如此一来,jdk在linux下的环境就成功了!
(1)安装zookeeper:
a.找到对应版本的zookeeper传输到linux
b.解压
c.由于名字有些长,为了方便,改一下名字:
(2)配置修改:
a. /opt/software/zookeeper-3.5.7/conf下的zoo_sample.cfg就是zookeeper的配置文件,zoo_sample.cfg是带有样例的意思,我们要把它改为zoo.cfg用于配置zookeeper信息
b.初步查看zookeeper的配置信息:vim zoo.cfg
配置信息如下,未截屏的都是注释!
由上图可知:默认情况下zookeeper的存储数据都默认保存在/tmp/zookeeper下。我们有知道linux下/tmp目录的存储信息是临时数据,隔一个月左右自动删除,因此我们这里要做修改!自己指定zookeeper数据的存储目录
c.自己指定zookeeper数据的存储目录:
i.创建目录存储zookeeper信息,我们一般把那个目录叫做zkData
注意:这个目录并非一定要在zookeeper下,这是为了方便我这样做。
ii.修改zookeeper配置:vim zoo.cfg
注意:集群状况或启动tomcat的情况下。zookeeper部署后, 3.5以后的版本, 会自动占用8080端口. 需要修改配置文件更改服务器端口。否则zk服务器启动不起来。
vim zoo.cfg
(3)操作zookeeper--启动测试
a.启动zookeeper服务端:
b.查看进程是否启动:
c.查看状态:
d.启动客户端:
./zkCli.sh:开启本服务器的客户端
./zkCli.sh -server 192.168.23.150:开启其他服务器的客户端:2181
e.简单使用:
f.关闭客户端:
g.关闭服务器:
(4)zookeeper下conf下zoo.cfg配置信息的参数详解:
a.tickTime = 2000:通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒:
b.initLimit = 10:LF初始通信时限:
c.syncLimit = 5:LF同步通信时限
d.dataDir:保存Zookeeper中的数据
注意:默认的tmp目录,容易被Linux系统定期删除,所以一般不用默认的tmp目录。
e.clientPort = 2181:客户端连接端口,通常不做修改。
我们这里使用的是sunlight,sunlight2,sunlight3,在安装-本地安装的基础上进行配置集群。
(0)我们这里的集群安装是一台服务器一台服务器操作的。有空学习集群脚本操作,haddoop相关
(1)我们这里使用的是sunlight,sunlight2,sunlight3,在安装-本地安装的基础上进行配置集群。
(2)配置服务器编号:
在/opt/software/zookeeper-3.5.7/zkkData下vim myid:
sunlight为10, sunlight2为2,sunlight3为3
(3)配置zoo.cfg文件(必须的,否则会出错)
注意:若使用主机名替代IP地址,要进行如下配置:vim /etc/hosts
a.打开 zoo.cfg 文件
b.vim zoo.cfg,在文件底部添加:三天服务器此配置一模一样
使用上面的较好。后面的2181是默认的clientPort = 2181配置
- server.2=192.168.23.150:2888:3888
- server.3=192.168.23.160:2888:3888
- server.10=192.168.23.130:2888:3888
(4)测试集群
a.三台服务器分别启动:
cd /opt/software/zookeeper/bin
./zkServer.sh start
b.查看状态:
./zkServer.sh status
(1)第一次启动时的选举
(2)非第一次启动的选举
leade相当于皇上,皇上宕机选新leaderr,epoch是几朝元老的意思
我们这里的集群安装是一台服务器一台服务器操作的。有空学习集群脚本操作,haddoop相关
脚本如下:
(1)里面是默认配置了免密登录的。若添加如下脚本后使用失败,考虑免密登录是否配置。
(2)脚本我们一般放在用户的家目录下:
(3)在上述目录下,编辑一个.sh结尾的文件用于存放zookeeper启动/停止的脚本
(4)并给予权限,让它可执行:chmod 777 zk.sh
- #!/bin/bash
- case $1 in
- "start"){
- for i in 192.168.23.130 192.168.23.150 192.168.23.160
- do
- echo ---------- zookeeper $i 启动 ------------
- ssh $i "/opt/software/zookeeper-3.5.7/bin/zkServer.sh start"
- done
- };;
- "stop"){
- for i in 192.168.23.130 192.168.23.150 192.168.23.160
- do
- echo ---------- zookeeper $i 停止 ------------
- ssh $i "/opt/software/zookeeper-3.5.7/bin/zkServer.sh stop"
- done
- };;
- "status"){
- for i in 192.168.23.130 192.168.23.150 192.168.23.160
- do
- echo ---------- zookeeper $i 状态 ------------
- ssh $i "/opt/software/zookeeper-3.5.7/bin/zkServer.sh status"
- done
- };;
- esac
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
上面的有方法不在上述代码里配置免密登录。
下面是在代码里配置了免密登录的:
前提是安装了:yum install sshpass
解释:
a.-o StrictHostKeyChecking=no是跳过密码提示的,不加会没反应
b.sshpass -p 201315的201315即为密码
- #!/bin/bash
- case $1 in
- "start"){
- for i in 192.168.23.130 192.168.23.150 192.168.23.160
- do
- echo ---------- zookeeper $i 启动 ------------
- sshpass -p 201315 ssh -o StrictHostKeyChecking=no $i "/opt/software/zookeeper-3.5.7/bin/zkServer.sh start"
- done
- };;
- "stop"){
- for i in 192.168.23.130 192.168.23.150 192.168.23.160
- do
- echo ---------- zookeeper $i 停止 ------------
- sshpass -p 201315 ssh -o StrictHostKeyChecking=no $i "/opt/software/zookeeper-3.5.7/bin/zkServer.sh stop"
- done
- };;
- "status"){
- for i in 192.168.23.130 192.168.23.150 192.168.23.160
- do
- echo ---------- zookeeper $i 状态 ------------
- sshpass -p 201315 ssh -o StrictHostKeyChecking=no $i "/opt/software/zookeeper-3.5.7/bin/zkServer.sh status"
- done
- };;
- esac
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
上述配置之后,zookeeper服务器的启动在三台服务器上就可以集群操作了,zookeeper客户端的启动未配置集群操作,需要自己手动开启(./zkCli.sh)。
(1)命令行语法
a.连接客户端后显示所有操作指令:help
(2)znode节点数据信息
a.查看当前znode中所包含的内容
b.查看当前节点详细数据
(3)节点类型
节点类型:
a.分别创建2个普通节点(永久节点 + 不带序号)
i.创建1级数据
ii.创建2级数据
b.获得节点的值
c.创建带序号的节点(永久节点 + 带序号)
i.先创建一个普通的根节点/sanguo/weiguo
ii.创建带序号的永久节点
对比上图和下图:可发现带序号的节点,数据相同也可以重复创建,不带序号的不能重复创建
d.创建短暂节点(短暂节点+不带序号/带序号)
i.创建短暂的不带序号的节点
ii.创建短暂的带序号的节点
iii.在当前客户端是能查看到的
iiii.退出当前客户端然后再重启客户端,发现之前创建的 /sanguo/wuguo 中的数据没了
e.修改节点数据值
i.修改前的值:
ii修改操作及修改后的值
(4)监听器原理:
a.节点的值变化监听
get -w:注册监听器
i.在sunlight3中监听了 /sanguo 里的值:
ii.sunlight2中修改了 /sanguo 里的值:
iii.此时我们再看sunlight3里的反应:
b.节点的子节点变化监听(路径变化)
ls -w:注册监听器
i.在sunlight3中监听 /sanguo 节点里的子节点的变化:
ii.sunlight2中创建子节点:
iii.此时我们再看sunlight3里的反应:
(5)节点删除与查看
a.删除无子节点的节点:
delete /sanguo/jin
b.删除有子节点的节点(递归删除节点):
deleteall /sanguo
c.查看节点状态:
(1)导入pom依赖
- <dependencies>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>RELEASE</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <version>2.8.2</version>
- </dependency>
-
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.5.7</version>
- </dependency>
- </dependencies>
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
(2)application.properties:进行slf4j的配置
- log4j.rootLogger=INFO, stdout
- log4j.appender.stdout=org.apache.log4j.ConsoleAppender
- log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
- log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
- log4j.appender.logfile=org.apache.log4j.FileAppender
- log4j.appender.logfile.File=target/spring.log
- log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
- log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
(3)测试
- package com.example.zookeeper;
-
- import lombok.extern.slf4j.Slf4j;
- import org.apache.zookeeper.WatchedEvent;
- import org.apache.zookeeper.Watcher;
- import org.apache.zookeeper.ZooKeeper;
- import org.junit.Test;
-
- @Slf4j
- public class ZkClient {
-
- // IP地址也可以写主机名称,当然要在windows的hosts文件中写上映射。
- // 注意:逗号前后不能有空格。
- private String connectString = "192.168.23.130:2181,192.168.23.150:2181,192.168.23.160:2181";
-
- private int sessionTimeout = 2000;
-
-
- @Test
- public void init() throws Exception{
-
- ZooKeeper zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
-
- @Override
- public void process(WatchedEvent watchedEvent) {
-
- }
-
- });
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
(4)测试结果如下:
2.创建子节点,在1的基础上进行:
(1)执行类中的create()方法
- package com.example.zookeeper;
-
- import lombok.extern.slf4j.Slf4j;
- import org.apache.zookeeper.*;
- import org.junit.Before;
- import org.junit.Test;
-
- @Slf4j
- public class ZkClient {
-
- // IP地址也可以写主机名称,当然要在windows的hosts文件中写上映射。
- // 注意:逗号前后不能有空格。
- private String connectString = "192.168.23.130:2181,192.168.23.150:2181,192.168.23.160:2181";
-
- private int sessionTimeout = 2000;
-
- private ZooKeeper zkClient;
-
- // 初始化--测试连接。测试之后为了配合下面的使用,
- // 把@Test注释掉,新增注释@Before,表示在任何方法执行前执行此方法,进行zookeeper的初始化
-
- // @Test
- @Before
- public void init() throws Exception{
-
- zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
-
- @Override
- public void process(WatchedEvent watchedEvent) {
-
- }
-
- });
- }
-
- // 创建子节点
- @Test
- public void create() throws KeeperException, InterruptedException {
-
-
- String nodeCreated =
- zkClient.create("/school","classone".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-
- System.out.println("结果:"+nodeCreated);
- }
-
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
(2)运行create()的结果
(3)查看zookeeper中的信息:
3.监听节点变化:实际操作时碰到了些问题
4.判断节点是否存在:见springboot-demo
5.客户端向服务端写数据流程
(1)先在集群上创建/servers 节点
(2)服务器注册:创建包com.example.zookeeper.case1,创建类DistributeServer
- package com.example.zookeeper.case1;
-
- import org.apache.zookeeper.*;
-
- // 服务端向zookeeper注册
- public class DistributeServer {
-
- private String connectString = "192.168.23.130:2181,192.168.23.150:2181,192.168.23.160:2181";
-
- // 连接需要时间,时间设长一点
- private int sessionTimeout = 200000;
-
- private ZooKeeper zk;
-
- public static void main(String[] args) throws Exception {
-
- DistributeServer server = new DistributeServer();
- // 1.获取zk连接
- server.getConnect();
-
- // 2.注册服务器到zk集群
- String a = "192.168.23.150";
- server.regist(a);
-
- // 3.启动业务逻辑(睡觉)。这里业务逻辑就不写了,sleep一会代表业务逻辑的运行。
- server.business();
-
- }
-
- private void business() throws Exception{
- Thread.sleep(Long.MAX_VALUE);
- }
-
- private void regist(String hostip) throws Exception {
- String create =
- zk.create("/servers/" + hostip, hostip.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
-
- System.out.println(hostip + " is online");
- }
-
- private void getConnect() throws Exception {
-
- zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
-
- @Override
- public void process(WatchedEvent watchedEvent) {
-
- }
- });
- }
-
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
(3)客户端监听:包com.example.zookeeper.case1下创建类DistributeClient
- package com.example.zookeeper.case1;
-
-
- import org.apache.zookeeper.WatchedEvent;
- import org.apache.zookeeper.Watcher;
- import org.apache.zookeeper.ZooKeeper;
-
- import java.util.ArrayList;
- import java.util.List;
-
- public class DistributeClient {
-
- private String connectString = "192.168.23.130:2181,192.168.23.150:2181,192.168.23.160:2181";
-
- // 连接需要时间,时间设长一点
- private int sessionTimeout = 200000;
-
- private ZooKeeper zk;
-
- public static void main(String[] args) throws Exception {
-
- DistributeClient client = new DistributeClient();
-
- // 1.获取zk连接
- client.getConnect();
-
- // 2.监听/servers下面子节点的增加和删除
- client.getServerList();
-
- // 3.启动业务逻辑(睡觉)。这里业务逻辑就不写了,sleep一会代表业务逻辑的运行。
- client.business();
- }
-
- private void business() throws InterruptedException {
- Thread.sleep(Long.MAX_VALUE);
- }
-
- public void getServerList() throws Exception {
- // watch:true 意味着会走
- // new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {}});方法
- // 监听注册只生效一次,让它一直生效,需要在getConnect()中的process(WatchedEvent watchedEvent)方法里再次调用
- List<String> children = zk.getChildren("/servers", true);
-
- ArrayList<String> servers = new ArrayList<>();
-
- for (String child : children) {
- byte[] data = zk.getData("/servers/" + child, false, null);
- servers.add(new String(data));
- }
-
- System.out.println(servers);
-
- }
-
- private void getConnect() throws Exception {
-
- zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
-
- @Override
- public void process(WatchedEvent watchedEvent) {
- try {
- getServerList();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
-
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
(4)测试
a.启动DistributeClient并查看DistributeClient控制台
解释:第一个[]是getServerList()方法的,第二个[]是getConnect()的process(WatchedEvent watchedEvent)功劳。同时getConnect()的process(WatchedEvent watchedEvent)方法还会在每次节点变化的时候都调用一次。
b.在192.168.23.130上zk的客户端/servers 目录上创建临时带序号节点
c.观察此刻idea DistributeClient控制台变化
d.在192.168.23.130上zk的客户端/servers 目录上执行删除操作
e.观察 Idea DistributeClient控制台变化
上面是开启 DistributeClient在linux中操作增减服务器。
下面是开启 DistributeServer在idea中操作增减服务器。
f.下面启动DistributeServer并观察 Idea DistributeServer控制台变化
g.查看Idea DistributeClient控制台变化
h.修改内部参数,再开启一台DistributeServer并观察 Idea DistributeServer控制台变化
操作前提:允许并行。
i.查看Idea DistributeClient控制台变化
(1)分布式锁实现 :创建包com.example.zookeeper.case2,创建类DistributeLock
- package com.example.zookeeper.case2;
-
- import org.apache.zookeeper.*;
- import org.apache.zookeeper.data.Stat;
-
- import java.io.IOException;
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
-
- public class DistributeLock {
-
- private String connectString = "192.168.23.130:2181,192.168.23.150:2181,192.168.23.160:2181";
-
- // 连接需要时间,时间设长一点
- private int sessionTimeout = 200000;
-
- private ZooKeeper zk;
-
- private CountDownLatch connectLatch = new CountDownLatch(1);
-
- private CountDownLatch waitLatch = new CountDownLatch(1);
-
- private String waitPath;
-
- private String currentNode;
-
- public DistributeLock() throws Exception {
-
- // 获取连接
- zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
-
- @Override
- public void process(WatchedEvent watchedEvent) {
- // connectLatch 如果连接上zk 可以释放
- if (watchedEvent.getState() == Event.KeeperState.SyncConnected){
- connectLatch.countDown();
- }
-
- // waitLatch 需要释放
- if (watchedEvent.getType()== Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)){
- waitLatch.countDown();
- }
- }
- });
-
- // 等待zk正常连接后,往下走程序
- connectLatch.await();
-
- // 判断根节点/locks是否存在 watch:false为不开启监听
- Stat stat = zk.exists("/locks", false);
-
- if (stat == null) {
- // 创建一下根节点
- zk.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- }
-
- // 对zk加锁
- public void zkLock() throws Exception{
- // 创建对应的临时带序号节点
- currentNode =
- zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
-
- // wait一小会, 让结果更清晰一些
- Thread.sleep(10);
-
- // 判断创建的节点是否是最小的序号节点,如果是则获取到锁。如果不是,监听序号前一个节点。
- List<String> children = zk.getChildren("/locks", false);
- // 如果children只有一个值,那就直接获取锁,如果有多个节点,需要判断谁最小
- if (children.size() == 1) {
- return;
- }else {
- Collections.sort(children);
-
- // 获取节点名称 seq-00000000
- String thisNode = currentNode.substring("/locks/".length());
- // 通过seq-00000000获取该节点在children集合的位置
- int index = children.indexOf(thisNode);
-
- // 判断
- if (index == -1) {
- System.out.println("数据异常");
- } else if (index == 0) {
- // 就一个节点,可以获取锁了
- return;
- } else {
- // 需要监听 他前一个节点变化
- waitPath = "/locks/" + children.get(index - 1);
- zk.getData(waitPath,true,new Stat());
-
- // 等待监听
- waitLatch.await();
-
- return;
- }
- }
- }
-
- // 解锁
- public void unZkLock() throws Exception {
-
- // 删除节点
- zk.delete(this.currentNode,-1);
- }
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
(1)分布式锁测试 :在包com.example.zookeeper.case2下,创建类DistributedLockTest
- package com.example.zookeeper.case2;
-
- import org.apache.zookeeper.KeeperException;
-
- import java.io.IOException;
-
- public class DistributedLockTest {
-
- public static void main(String[] args) throws Exception {
-
- final DistributeLock lock1 = new DistributeLock();
-
- final DistributeLock lock2 = new DistributeLock();
-
- new Thread(new Runnable() {
- @Override
- public void run() {
-
- try {
- lock1.zkLock();
- System.out.println("线程1 启动,获取到锁");
- Thread.sleep(5 * 1000);
-
- lock1.unZkLock();
- System.out.println("线程1 释放锁");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }).start();
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- lock2.zkLock();
- System.out.println("线程2 启动,获取到锁");
- Thread.sleep(5 * 1000);
-
- lock2.unZkLock();
- System.out.println("线程2 释放锁");
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }).start();
-
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
(3)观察控制台变化:
(1)原生的 Java API 开发存在的问题
a.会话连接是异步的,需要自己去处理。比如使用 CountDownLatch
详情见官网:Apache Curator –
(3)Curator 案例实操
a.添加依赖
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>4.3.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>4.3.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-client</artifactId>
- <version>4.3.0</version>
- </dependency>
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
b.代码实现:
- package com.example.zookeeper.case3;
-
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.framework.recipes.locks.InterProcessMutex;
- import org.apache.curator.retry.ExponentialBackoffRetry;
-
- public class CuratorLockTest {
-
- public static void main(String[] args) {
-
- // 创建分布式锁1
- InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
-
- // 创建分布式锁2
- InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- lock1.acquire();
- System.out.println("线程1 获取到锁");
-
- lock1.acquire();
- System.out.println("线程1 再次获取到锁");
-
- Thread.sleep(5 * 1000);
-
- lock1.release();
- System.out.println("线程1 释放锁");
-
- lock1.release();
- System.out.println("线程1 再次释放锁");
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }).start();
-
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- lock2.acquire();
- System.out.println("线程2 获取到锁");
-
- lock2.acquire();
- System.out.println("线程2 再次获取到锁");
-
- Thread.sleep(5 * 1000);
-
- lock2.release();
- System.out.println("线程2 释放锁");
-
- lock2.release();
- System.out.println("线程2 再次释放锁");
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }).start();
- }
-
- private static CuratorFramework getCuratorFramework() {
-
- ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
-
- CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.23.130:2181,192.168.23.150:2181,192.168.23.160:2181")
- .connectionTimeoutMs(200000)
- .sessionTimeoutMs(200000)
- .retryPolicy(policy).build();
-
- // 启动客户端
- client.start();
-
- System.out.println("zookeeper 启动成功");
- return client;
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
c.观察控制台
1.CAP理论:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。