赞
踩
Zookeeper服务器是用Java创建的,它运行在JVM上。需要安装JDK7或更高版本。
zookeeper官网:https://zookeeper.apache.org/
本次下载的是官网推荐版本
我们建议您下载以下站点:
https://dlcdn.apache.org/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz
下载完成后,上传到服务器。
解压到指定目录,这里以/opt/zookeeper/为例
tar -zxvf apache-zookeeper-3.7.1-bin.tar.gz
创建目录,用于存储zookeeper数据 zkdata/
mkdir zkdata
进入apache-zookeeper-3.7.1-bin/conf/目录下面,拷贝zoo_sample.cfg到zoo.cfg
cp zoo_sample.cfg zoo.cfg
# 因为配置文件改成zoo.cfg才能生效。
默认端口:2181
配置zoo.cfg文件中的dataDir
# 存储zookeeper数据的位置
dataDir=/opt/zookeeper/zkdata
启动zookeeper服务
cd /opt/zookeeper/apache-zookeeper-3.7.1-bin/bin/
#启动
./zkServer.sh start
# 提示STARTED 表示启动成功
查看zookeeper状态
./zkServer.sh status
# standalone代表zk没有搭建集群,现在是单节点。
# It is probably not running 表示没有启动。
[root@localhost bin]# ./zkServer.sh version ZooKeeper JMX enabled by default Using config: /opt/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg Apache ZooKeeper, version 3.7.1 2022-05-07 06:45 UTC [root@localhost bin]# ./zkServer.sh start ZooKeeper JMX enabled by default Using config: /opt/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg Starting zookeeper ... STARTED [root@localhost bin]# ./zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: standalone [root@localhost bin]# ./zkServer.sh restart ZooKeeper JMX enabled by default Using config: /opt/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg ZooKeeper JMX enabled by default Using config: /opt/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg Stopping zookeeper ... STOPPED ZooKeeper JMX enabled by default Using config: /opt/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg Starting zookeeper ... STARTED [root@localhost bin]# ./zkServer.sh stop ZooKeeper JMX enabled by default Using config: /opt/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg Stopping zookeeper ... STOPPED [root@localhost bin]# ./zkServer.sh status ZooKeeper JMX enabled by default Using config: /opt/zookeeper/apache-zookeeper-3.7.1-bin/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Error contacting service. It is probably not running. [root@localhost bin]#
启动:./zkServer.sh start
查看状态:./zkServer.sh status
停止:./zkServer.sh stop
重启:./zkServer.sh restart
启动客户端:./zkCle.sh -server ip:port 如:./zkCle.sh -server localhost:2181 退出客户端:quit 查看指定路径下的子节点:ls /zookeeper 新建节点:create /app2 给节点设置数据:set /app2 haha 新建节点,同时设置数据:create /app1 hehe 获取节点数据:get /app1 删除单个节点:delete /app2 删除包含子节点的节点:deleteall /app1 创建临时节点:create -e /app1 当前会话关闭后,该节点会自动删除 创建顺序节点:create -s /app1 持久化顺序节点app10000000003, app10000000004, app10000000005 创建临时顺序节点:create -es /app3 查看节点详细信息:ls -s / 旧版本中可以用ls2 /
package com.zxl.curator; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.List; public class CuratorTest { private CuratorFramework client; // 需要关闭防火墙 @Before public void testClient() { // // 重试策略 // RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10); // // /** // * // * @param connectString 连接字符串。zk server 地址和端口"192.168.135.128:2181,192.168.135.128:2181”集群用逗号隔开 // * @param sessionTimeoutMs 会话超时时间 单位ms // * @param connectionTimeoutMs connection timeout 连接超时时间 单位ms // * @param retryPolicy retry policy to use 重试策略 // * @return client // */ // // 第一种方式 // client = CuratorFrameworkFactory.newClient("192.168.135.128:2181", // 60*1000,15*1000,retryPolicy); // // 开启连接 // client.start(); // 重试策略 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10); // 第二种方式 client = CuratorFrameworkFactory.builder().connectString("192.168.135.128:2181") .sessionTimeoutMs(60*1000) .connectionTimeoutMs(15*1000) .retryPolicy(retryPolicy) .namespace("zxl") .build(); // namespace 设置名称空间后,后面的所有操作,默认都是以/zxl为根目录,并且会自动创建这个目录。 // 开启连接 client.start(); } // ==================================create================================== // 创建持久化单节点,并设置数据 @Test public void testCreate() throws Exception { // 默认类型:持久化 String path = client.create().forPath("/app2", "haah".getBytes()); System.out.println(path); } // 创建临时节点 @Test public void testCreateMode() throws Exception { // withMode设置类型,CreateMode.EPHEMERAL临时节点 String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3", "haah".getBytes()); System.out.println(path); } // 创建多级节点 @Test public void testCreate4() throws Exception { // .creatingParentsIfNeeded()创建多级节点,如果父节点不存在,则创建父节点 String path = client.create().creatingParentsIfNeeded().forPath("/app1/p1", "haah".getBytes()); System.out.println(path); } // ==================================get================================== // 查询指定目录的子节点 @Test public void testGet() throws Exception { List<String> strings = client.getChildren().forPath("/"); System.out.println(strings); } // 查询指定节点的数据 @Test public void testGetData() throws Exception { byte[] bytes = client.getData().forPath("/app2"); System.out.println(new String(bytes)); } // 查询指定节点的状态 @Test public void testGetStatus() throws Exception { Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath("/app2"); System.out.println(stat); } // ==================================set================================== // 设置值,如果forPath只有一个地址,则默认设置该节点数据为IP @Test public void testSet() throws Exception { Stat stat = client.setData().forPath("/app2"); System.out.println(stat); // 57,62,1660185902972,1660186251961,1,0,0,0,13,0,57 } // 修改节点的值 @Test public void testSetDate() throws Exception { Stat stat = client.setData().forPath("/app2","bbb".getBytes()); System.out.println(stat); } // 根据版本修改,保证原子性操作 @Test public void testSetByVersion() throws Exception { // 创建stat,记录节点状态信息 Stat stat = new Stat(); // 将查出来的节点信息封装到stat中 client.getData().storingStatIn(stat).forPath("/app2"); // 获取当前节点的版本,目的是上其他客户端或其他线程不干扰该操作。 int version = stat.getVersion(); client.setData().withVersion(version).forPath("/app2","haha".getBytes()); } // ==================================delete================================== // 删除节点 @Test public void testDelete() throws Exception { client.delete().forPath("/abc"); } // 删除有子节点的节点 @Test public void testDelete2() throws Exception { client.delete().deletingChildrenIfNeeded().forPath("/app4"); } // 必须成功的删除,如果有网络抖动导致删除失败,则会重试 @Test public void testDelete3() throws Exception { client.delete().guaranteed().forPath("/app4"); } // 删除后回调 @Test public void testDelete4() throws Exception { client.delete().guaranteed().inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { // 回调函数的具体操作 System.out.println("我被删除了"); System.out.println(curatorEvent); // 根据resultCode=的值,判断删除是否成功 } }).forPath("/app1"); } // 关闭客户端 @After public void testClose() { if (client != null) client.close(); } }
Zookeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,Zookeeper服务端会将事件通知到感兴趣的客户端上去,该机制是Zookeeper实现分布式协调服务的重要特性。
Zookeeper中引入了Watcher机制来实现了发布/订阅功能,能够让多个订阅者同时监听某一个对象,当一个对象自身状态变化时,会通知所有订阅者。
Curator引入了Cache来实现对Zookeeper服务端事件的监听。
Zookeeper提供了三种Watcher:
// 创建NodeCache对象
final NodeCache nodeCache = new NodeCache(client,"/app1");
// 注册监听
nodeCache.getListenable().addListener(new NodeCacheListener(){
@Override
public void nodeChanged() throws Exception {
System.out.println("节点变化了~");
// 获取修改节点后的数据
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
// 开启监听 ,如果设置为true,则开启监听时,加载缓冲数据
nodeCache.start(true);
@Test public void testWatcher() throws Exception { // 创建PathChildrenCache对象 PathChildrenCache pcc = new PathChildrenCache(client,"/app1",true); // 注册监听 pcc.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println("节点变化了"); // 获取修改节点后的数据 List<ChildData> currentData = pcc.getCurrentData(); for (ChildData cd : currentData) { System.out.println(new String(cd.getData())); } System.out.println(event); // 监听子节点的数据变更,并且拿到变更后的数据。 PathChildrenCacheEvent.Type type = event.getType(); if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { byte[] data = event.getData().getData(); System.out.println(new String(data)); } } }); // lambda表达式 /* pcc.getListenable().addListener((client1, event) -> { System.out.println("节点变化了"); // 获取修改节点后的数据 List<ChildData> currentData = pcc.getCurrentData(); for (ChildData cd : currentData) { System.out.println(new String(cd.getData())); } }); */ // 开启监听 ,如果设置为true,则开启监听时,加载缓冲数据 pcc.start(); // 为了让客户端不会关闭 while (true){ } }
在Curator中有五种锁方案:
// 初始化锁
InterProcessMutex lock = new InterProcessMutex(client,"/lock");
// 获取锁
lock.acquire(3,TimeUnit.SECONDS);
// 释放锁
lock.release();
Leader选举:
# 在每个配置文件中配置如下列表
server.1=192.168.135.127:2881:3881
server.2=192.168.135.128:2881:3881
server.3=192.168.135.129:2881:3881
# 解释:server.服务器id=服务器IP地址:服务器之间通信端口:服务器之间投票选举端口
# 客户端和服务端连接默认端口是:2181
# 服务器和服务器通信(数据同步)默认端口是:2881
# 服务器之间投票选举默认端口是:3881
3台zookeeper服务器组成集群,服务id分别为1、2、3;依次启动机器。2号为Leader,1和3为从服务器
在Zookeeper集群服务中有三个角色:
总结:因为Follower的读取压力比较大,所以Observer帮助处理非事务的请求,但是不参与投票。
当跟随者和观察者收到事务请求,会将请求转发到Leader处理,Leader处理完成后,将数据同步到跟随者和观察者。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。