赞
踩
主要内容
Zookeeper 简介
Zookeeper 存储结构
监听通知
安装 Zookeeper
Zookeeper 常用命令
使用 Java API 操作 Zookeeper
Zookeeper 实战
Zookeeper 官网:http://zookeeper.apache.org/
Zookeeper 是 Apache 的一个分布式服务框架,是 Apache Hadoop 的一个子项目。官方 文档上这么解释 Zookeeper,它主要是用来解决分布式应用中经常遇到的一些数据管理问题, 如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。
简单来说 zookeeper=文件系统+监听通知机制。
在 Zookeeper 中,znode 是一个跟 Unix 文件系统路径相似的节点,可以向节点存储 数据或者获取数据。
Zookeeper 底层是一套数据结构。这个存储结构是一个树形结构,其上的每一个节点, 我们称之为“znode”
Zookeeper 中的数据是按照“树”结构进行存储的。而且 znode 节点还分为 4 中不同 的类型。
每一个 znode 默认能够存储 1MB 的数据(对于记录状态性质的数据来说,够了)
可以使用 zkCli 命令,登录到 Zookeeper 上,并通过 ls、create、delete、get、set 等命令操作这些 znode 节点。
客户端与 zookeeper 断开连接后,该节点依旧存在。
客户端与 zookeeper 断开连接后,该节点依旧存在,只是 Zookeeper 给该节点名称进行 顺序编号。
客户端与 zookeeper 断开连接后,该节点被删除。
客户端与 zookeeper 断开连接后,该节点被删除,只是 Zookeeper 给该节点名称进行顺 序编号。
Zookeeper 是使用观察者设计模式
来设计的。当客户端注册监听它关心的目录节点时, 当目录节点发生变化(数据改变、被删除、子目录节点增加删除)时,Zookeeper 会通知客户端。
官方资源包可在 zookeeper.apache.com 站点中下载。这里安装版本为:3.6.0。
配置环境变量
export JAVA_HOME=/usr/local/jdk
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH
[root@localhost temp]# tar -zxf zookeeper-3.6.0.tar.gz
[root@localhost temp]# cp zookeeper-3.6.0 /usr/local/zookeeper -r
1. bin:放置运行脚本和工具脚本,
2. conf:zookeeper 默认读取配置的目录,里面会有默认的配置文件
3. docs:zookeeper 相关的文档
4. lib:zookeeper 核心的 jar
5. logs:zookeeper 日志
Zookeeper 在启动时默认的去 conf 目录下查找一个名称为 zoo.cfg 的配置文件。
在 zookeeper 应用目录中有子目录 conf。其中有配置文件模板:zoo_sample.cfg
cp zoo_sample.cfg zoo.cfg。zookeeper 应用中的配置文件为 conf/zoo.cfg。
修改配置文件 zoo.cfg - 设置数据缓存路径
默认加载配置文件:./zkServer.sh start:默认的会去 conf 目录下加载 zoo.cfg 配置文件。
指定加载配置文件:./zkServer.sh start 配置文件的路径
。
./zkServer.sh stop
./zkServer.sh status
bin/zkCli.sh
默认连接地址为本机地址,默认连接端口为 2181
Ctrl+C 退出连接
bin/zkCli.sh -server ip:port 连接指定 IP 地址与端口
Zookeeper 集群中的角色主要有以下三类
使用 3 个 Zookeeper 应用搭建一个伪集群。应用部署位置是:192.168.233.130。客户端 监听端口分别为:2181、2182、2183。投票选举端口分别为 2881/3881、2882/3882、2883/3883。
tar -zxf zookeeper-3.6.0.tar.gz
将解压后的 Zookeeper 应用目录重命名,便于管理
mv zookeeper-3.6.0 zookeeper01
在 zookeeper01 应用目录中,创建 data 目录,用于缓存应用运行数据
cd zookeeper01
mkdir data
复制两份 Zookeeper 应用。用于模拟集群中的 3 个节点。
cp -r zookeeper01 zookeeper02
cp -r zookeeper01 zookeeper03
在 zookeeper 应用目录中有子目录 conf。其中有配置文件模板:zoo_sample.cfg
cp zoo_sample.cfg zoo.cfg
zookeeper 应用中的配置文件为 conf/zoo.cfg
。
dataDir 参数值为应用运行缓存数据保存目录。
在 Zookeeper 集群中,每个节点需要一个唯一标识。这个唯一标识要求是自然数。且唯 一标识保存位置是:数据缓存目录(dataDir=/usr/local/zookeeper/data
)的myid
文件中。其中“数据缓存目录”为配置文件 zoo.cfg 中的配置参数
在 data 目录中创建文件myid : touch myid
为应用提供唯一标识。本环境中使用 1、2、3 作为每个节点的唯一标识。
vi myid
简化方式为: echo [唯一标识] >> myid
。 echo 命令为回声命令,系统会将命令发送的 数据返回。'>>'
为定位,代表系统回声数据指定发送到什么位置。 此命令代表系统回声数 据发送到 myid 文件中。 如果没有文件则创建文件。
vim zoo.cfg
clientPort=2181 #服务端口根据应用做对应修改,zk01-2181,zk02-2182,zk03-2183
server.1=192.168.233.130:2881:3881
server.2=192.168.233.130:2882:3882
server.3=192.168.233.130:2883:3883
在 Linux 中可以使用 chmod 命令为文件授权。
chmod 777 文件名
777 表示为文件分配可读,可写,可执行权限。
zookeeper01/bin/zkServer.sh start
zookeeper02/bin/zkServer.sh start
zookeeper03/bin/zkServer.sh start
zookeeper01/bin/zkServer.sh stop
zookeeper02/bin/zkServer.sh stop
zookeeper03/bin/zkServer.sh stop
可以使用任何节点中的客户端工具连接集群中的任何节点。
./zkCli.sh -server 192.168.233.130:2183
ls /path
使用 ls 命令查看 zookeeper 中的内容。在 ZooKeeper 控制台客户端中,没有默认列表功 能,必须指定要列表资源的位置。 如:ls /
或者ls /path
create [-e] [-s] /path [data]
使用 create 命令创建一个新的 Znode。create [-e] [-s] path data -
创建节点,如: create /test 123
创建一个/test
节点,节点携带数据信息 123。create -e /test 123
创建一个临时节 点/test
,携带数据为 123,临时节点只在当前会话生命周期中有效,会话结束节点自动删除。 create -s /test 123
创建一个顺序节点/test,携带数据 123,创建的顺序节点由 ZooKeeper 自 动为节点增加后缀信息,如-/test00000001
等。-e
和-s
参数可以联合使用。
get [-s] /path
get 命令获取 Znode 中的数据。
get -s /path
-s
查看 Znode 详细信息
oldlu:存放的数据
cZxid:创建时 zxid(znode 每次改变时递增的事务 id)
ctime:创建时间戳
mZxid:最近一次更近的 zxid
mtime:最近一次更新的时间戳
pZxid:子节点的 zxid
cversion:子节点更新次数
dataversion:节点数据更新次数
aclVersion:节点 ACL(授权信息)的更新次数
ephemeralOwner:如果该节点为 ephemeral 节点(临时,生命周期与 session 一样),
ephemeralOwner 值表示与该节点绑定的 session id. 如果该节点不是
ephemeral 节点, ephemeralOwner 值为 0.
dataLength:节点数据字节数
numChildren:子节点数量
set /path [data]
添加或修改 Znode 中的值
delete /path
删除 Znode。
该依赖为基于 Java 语言连接 Zookeeper 的客户端工具
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
</dependencies>
/** * 操作Zookeeper的Znode */ public class ZnodeDemo implements Watcher { public static void main(String[] args) throws KeeperException, InterruptedException, IOException { //创建连接Zookeeper对象 ZooKeeper zooKeeper = new ZooKeeper("192.168.233.130:2181,192.168.233.130:2182,192.168.233.130:2183",150000,new ZnodeDemo()); //创建一个Znode String path = zooKeeper.create("/bjsxt/test","oldlu".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); System.out.println(path); } /** * 事件通知回调方法 * @param event */ @Override public void process(WatchedEvent event) { //获取连接事件 if(event.getState() == Event.KeeperState.SyncConnected){ System.out.println("连接成功"); } } }
byte[] data= zooKeeper.getData("/bjsxt/test0000000001",new ZnodeDemo(),new Stat());
System.out.println(new String(data));
List<String> list = zooKeeper.getChildren("/bjsxt",new ZnodeDemo());
for(String path :list){
byte[] data= zooKeeper.getData("/bjsxt/"+path,new ZnodeDemo(),null);
System.out.println(new String(data));
}
//设置Znode中的值
// -1:表示匹配任何版本
Stat stat = zooKeeper.setData("/bjsxt/test0000000001","bjsxt".getBytes(),-1);
System.out.println(stat);
/** * 操作Zookeeper的Znode */ public class ZnodeDemo implements Watcher { public static void main(String[] args) throws KeeperException, InterruptedException, IOException { //创建连接Zookeeper对象 ZooKeeper zooKeeper = new ZooKeeper("192.168.233.130:2181,192.168.233.130:2182,192.168.233.130:2183",150000,new ZnodeDemo()); //创建一个Znode /* String path = zooKeeper.create("/bjsxt/test","oldlu".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); System.out.println(path);*/ //获取指定节点的数据 /*byte[] data= zooKeeper.getData("/bjsxt/test0000000001",new ZnodeDemo(),new Stat()); System.out.println(new String(data));*/ //获取指定节点中的所有子节点中的数据 /* List<String> list = zooKeeper.getChildren("/bjsxt",new ZnodeDemo()); for(String path :list){ byte[] data= zooKeeper.getData("/bjsxt/"+path,new ZnodeDemo(),null); System.out.println(new String(data)); }*/ //设置Znode中的值 // -1:表示匹配任何版本 /* Stat stat = zooKeeper.setData("/bjsxt/test0000000001","bjsxt".getBytes(),-1); System.out.println(stat);*/ zooKeeper.delete("/bjsxt/test0000000001",-1); } /** * 事件通知回调方法 * @param event */ @Override public void process(WatchedEvent event) { //获取连接事件 if(event.getState() == Event.KeeperState.SyncConnected){ System.out.println("连接成功"); } } }
实战案例介绍:使用 Zookeeper 与 RMI 技术实现一个 RPC 框架。
RPC:RPC(Remote Procedure Call)远程过程调用。
RMI(Remote Method Invocation) 远程方法调用。
RMI 是从 JDK1.2 推出的功能,它可以实现在一个 Java 应用中可以像调用本地方法一样 调用另一个服务器中 Java 应用(JVM)中的内容。
RMI 是 Java 语言的远程调用,无法实现跨语言。
Registry(注册表)是放置所有服务器对象的命名空间。 每次服务端创建一个对象时,它 都会使用 bind()
或 rebind()
方法注册该对象。 这些是使用称为绑定名称的唯一名称注册的。
要调用远程对象,客户端需要该对象的引用。即通过服务端绑定的名称从注册表中获取 对象(lookup()
方法)。
java.rmi.Remote
定义了此接口为远程调用接口。如果接口被外部调用,需要继承此接 口。
java.rmi.RemoteException
继承了 Remote 接口的接口,如果方法是允许被远程调用的,需要抛出此异常。
java.rmi.server.UnicastRemoteObject
此类实现了 Remote 接口和 Serializable 接口。
自定义接口实现类除了实现自定义接口还需要继承此类。
java.rmi.registry.LocateRegistry
可以通过 LocateRegistry 在本机上创建 Registry,通过特定的端口就可以访问这个 Registry。
java.rmi.Naming Naming
定义了发布内容可访问 RMI 名称。也是通过 Naming 获取到指定的远程方法。
/**
* 定义允许远程调用接口,该接口必须要实现Remote接口
* 允许被远程调用的方法必须要抛出RemoteException
*/
public interface DemoService extends Remote {
String demo(String str) throws RemoteException;
}
/**
* 接口实现类必须要继承UnicastRemoteObject。
* 会自动添加构造方法,需要修改为public
*/
public class DemoServiceImpl extends UnicastRemoteObject implements DemoService {
public DemoServiceImpl() throws RemoteException {
}
@Override
public String demo(String str) throws RemoteException {
return "Hello RMI "+str;
}
}
public class DemoServer {
public static void main(String[] args) throws RemoteException, AlreadyBoundException, MalformedURLException {
//将对象实例化
DemoService demoService = new DemoServiceImpl();
//创建本地注册表
LocateRegistry.createRegistry(8888);
//将对象绑定到注册表中
Naming.bind("rmi://localhost:8888/demoService",demoService);
}
}
/**
* 定义允许远程调用接口,该接口必须要实现Remote接口
* 允许被远程调用的方法必须要抛出RemoteException
*/
public interface DemoService extends Remote {
String demo(String str) throws RemoteException;
}
public class ClientDemo {
public static void main(String[] args) throws RemoteException, NotBoundException, MalformedURLException {
DemoService demoService = (DemoService)Naming.lookup("rmi://localhost:8888/demoService");
String result = demoService.demo("Bjsxt");
System.out.println(result);
}
}
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
</dependencies>
public interface UsersService extends Remote {
String findUsers(String str) throws RemoteException;
}
public class UsersServiceImpl extends UnicastRemoteObject implements UsersService {
public UsersServiceImpl() throws RemoteException {
}
@Override
public String findUsers(String str) throws RemoteException {
return "Hello Zookeeper "+str;
}
}
public class ServerDemo implements Watcher {
public static void main(String[] args) throws IOException, AlreadyBoundException, KeeperException, InterruptedException {
UsersService usersService = new UsersServiceImpl();
LocateRegistry.createRegistry(8888);
String url ="rmi://localhost:8888/user";
Naming.bind(url,usersService);
//将url信息放到zookeeper的节点中
ZooKeeper zooKeeper = new ZooKeeper("192.168.233.130:2181,192.168.233.130:2182,192.168.233.130:2183",150000,new ServerDemo());
//创建Znode
zooKeeper.create("/bjsxt/service",url.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("服务发布成功");
}
}
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
</dependencies>
public interface UsersService {
String findUsers(String str);
}
public class ClientDemo implements Watcher { public static void main(String[] args) throws IOException, KeeperException, InterruptedException, NotBoundException { ZooKeeper zooKeeper = new ZooKeeper("192.168.233.130:2181,192.168.233.130:2182,192.168.233.130:2183",150000,new ClientDemo()); byte[] bytes = zooKeeper.getData("/bjsxt/service",new ClientDemo(),null); String url = new String(bytes); UsersService usersService = (UsersService) Naming.lookup(url); String result = usersService.findUsers("Bjsxt"); System.out.println(result); } @Override public void process(WatchedEvent event) { if(event.getState() == Event.KeeperState.SyncConnected){ System.out.println("连接成功"); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。