赞
踩
在分布式组件里面,Zookeeper是一个为其他分布式服务提供协调服务的中间件。
服务包含:统一配置管理、统一命名服务、分布式锁、集群管理、分布式队列、数据发布订阅…
本质上可以归纳为两类:
1.基于Znode树的数据管理
2.数据的通知和监听
Zookeeper启动时,它会创建一个分层的命名空间,称为Znode树。Znode是Zookeeper中的基本数据节点,类似于文件系统中的文件夹。接下来,当应用程序连接到Zookeeper时,它可以使用Zookeeper提供的API将自己的信息存储在Znode上。这个过程被称为注册。
统一配置管理
在配置管理服务中,应用程序可以使用Zookeeper提供的API轻松读取和更改配置。Zookeeper允许多个应用程序同时监听某个Znode,从而使它们能够及时获得配置更改通知。
统一命名服务
在命名服务中,应用程序可以使用Zookeeper提供的API在特定的Znode路径下注册或查找它们的服务名称。Zookeeper会将这些服务名称分配给一个全局唯一的标识符,并允许其他应用程序查找它们。
分布式锁
在分布式锁管理中,应用程序可以使用Zookeeper提供的API创建锁节点。只有一个应用程序能够成功地在该节点上创建子节点,从而获得锁。其他应用程序必须等待当前持有锁的应用程序释放锁,然后才有机会获取锁并执行相应的任务。
集群管理
集群状态管理:所有服务节点创建临时节点,然后监听子节点的变化消息。比如一旦有机器挂掉,临时节点删除,其他节点收到通知。
集群负载均衡:所有服务节点创建临时节点,zookeeper根据负载均衡算法将任务分发到各个机器节点中处理。
集群leader选举:多个客户端请求创建同一个临时节点,最终只有一个客户端请求能够创建成功。
分布式队列
在分布式队列中,应用程序可以使用Zookeeper提供的API将任务添加到队列中,然后由不同的工作进程处理这些任务。当一个任务被添加到队列中时,它将作为一个Znode被创建。多个工作进程可以同时监听该Znode,以便及时获取新任务的通知。
数据发布订阅
Zookeeper 提供了一种称为“Watcher 机制”的发布/订阅模式,用于实现数据的发布和订阅。具体来说,Zookeeper 允许客户端在某个节点上设置 Watcher,当该节点的状态发生变化时,Zookeeper 会通知所有订阅该节点的客户端,从而实现数据的发布/订阅。
下面是 Zookeeper 实现数据发布/订阅的步骤:
- 客户端向 Zookeeper 服务器注册一个 Watcher,指定要监控的节点路径。
- 当该节点的状态发生变化(例如节点数据被修改、节点删除等),Zookeeper 会将变化通知所有注册了 Watcher 的客户端。
- 客户端接收到 Watcher 通知后,可以采取相应的操作,例如重新获取节点数据、更新本地缓存等。
- 如果客户端不再需要订阅节点的变化,可以取消 Watcher 注册。
总体来说,Zookeeper通过提供基于Znode树的数据存储和一组强大的API,使得这些服务变得非常容易实现。应用程序只需要使用这些API来注册、读取和创建Znode,就可以使用Zookeeper提供的各种服务。
前提:安装了JDK
下载zookeeper安装包apache-zookeeper-3.7.1-bin.tar.gz,解压
zoo_sample.cfg 复制一份改名为zoo.cfg
# zookeeper时间配置中的基本单位(毫秒) tickTime=2000 # 允许follower初始化连接到leader的最大时长,它表示tickTime的时间倍数,即:initLimit*tickTime initLimit=10 # 允许follower与leader数据同步最大时长,它表示tickTime的时间倍速,即:syncLimit*tickTime syncLimit=5 # zookeeper数据存储目录及日志保存目录(如果没有指明dataLogDir,则日志也保存在这个文件中) dataDir=/java/myzookeeper/data # 对客户端提供的端口号 clientPort=2181 # 单个客户端与zookeeper最大并发连接数 maxClientCnxns=60 # 保存的数据快照数量,之外的将会被清除 autopurge.snapRetainCount=3 # 自动触发清除任务时间间隔,单位为小时,默认为0,表示不自动清除 autopurge.purgeInterval=1
cd /java/myzookeeper
重启zk服务器:
./bin/zkServer.sh start ./conf/zoo.cfg
查看zk服务器的状态:
./bin/zkServer.sh status ./conf/zoo.cfg
停止服务器:
./bin/zkServer.sh stop ./conf/zoo.cfg
zk中的数据是保存在节点上的,节点就是znode,多个znode之间构成一棵树的目录结构。类似于数据结构中的树,同时也很像文件系统的目录
这样的层级结构,让每一个Znode的节点拥有唯一的路径,就像命名空间一样对不同信息做出清晰的隔离。
# 查看根节点
ls /
zk中的znode包含了四个部分
data:保存数据
acl:权限:
c:create 创建权限,允许在该节点下创建子节点
w:write 更新权限,允许更新该节点的数据
r:read 读取权限,允许读取该节点的内容以及子节点的列表信息
d:delete 删除权限,允许删除该节点的子节点信息
a:admin 管理者权限,允许对该节点进行acl权限设置
stat:描述当前znode的元数据
child:当前节点的子节点
1、持久节点:创建出的节点,在会话结束后依然存在。保存数据
2、持久序号节点:创建出的节点,根据先后顺序,会在节点之后带上一个数值,越后执行数值越大,适用于分布式锁的应用场景-单调递增
3、临时节点:临时节点是在会话结束后,自动被删除的,通过这个特性,zk可以实现服务注册与发现的效果。
4、临时序号节点:跟持久序号节点相同,适用于临时的分布式锁
5、Container节点(3.5.3版本新增):Container容器节点,当容器中没有任何子节点,该容器节点会被zk定期删除
6、TTL节点:可以指定节点的到期时间,到期后被zk定时删除。只能通过系统配置zookeeper.extendedTypeEnablee=true开启
zk的数据是运行在内存中,zk提供了两种持久化机制:
- 事务日志
zk把执行的命令以日志形式保存在dataLogDir指定的路径中的文件中(如果没有指定dataLogDir,则按照 dataDir指定的路径)
- 数据快照
zk会在一定的时间间隔内做一次内存数据快照,把时刻的内存数据保存在快照文件中
zk通过两种形式的持久化,在恢复时先恢复快照文件中的数据到内存中,再用日志文件中的数据做增量恢复,这样恢复的速度更快。
创建持久节点
create /xxx
创建持久序号节点
create -s /xxx
创建临时节点
create -e /xxx
创建临时序号节点
create -e -s /xxx
创建容器节点
create -c /xxx
普通查询
# 本级查询
ls /xxx
# 递归查询
ls -R /xxx
查询节点的内容
get /xxx
查询节点信息
get -s /xxx
- cZxid:创建节点的事务ID
- mZxid:修改节点的事务ID
- pZxid:添加和删除子节点的事务ID
- ctime:节点创建的时间
- mtime:节点最近修改的时间
- dataVersion:节点内数据的版本,每更新一次数据,版本会+1
- aclVersion:此节点的权限版本
- ephemeralOwner:如果当前节点是临时节点,该是是当前节点所有者的session id。如果节点不是临时节点,则该值为零
- dataLength:节点内数据的长度
- numChildren:该节点的子节点个数
普通删除
# 删除节点,当有子节点,删除失败,要用下面的命令
delete /xxx
# 删除节点以及子节点
deleteall /xxx
乐观锁删除
#只有指定删除的数据版本 == 当前节点的数据版本号,才能够删除成功,因为每对节点进行一次数据操作,节点的dataVersion就会+1。这样删除,通过乐观锁保证并发下数据操作的唯一性
delete -v dataVersion
注册当前会话的账号和密码:
addauth digest xiaowang:123456
创建节点并设置权限(指定该节点的用户,以及用户所拥有的权限s)
create /test-node abcd auth:xiaowang:123456:cdwra
在另一个会话中必须先使用账号密码,才能拥有操作节点的权限
Curator是Netflix公司开源的一套zookeeper客户端框架,Curator是对Zookeeper支持最好的客户端框架。Curator封装了大部分Zookeeper的功能,比如Leader选举、分布式锁等,减少了技术人员在使用Zookeeper时的底层细节开发工作。
~~~ xml <!--Curator--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency> <!--Zookeeper--> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.7.1</version> </dependency> ~~~
curator.retryCount=5 # 重试次数
curator.elapsedTimeMs=5000 # 临时节点的超时时间
curator.connectionString=172.0.0.1:2181 # 连接地址
curator.sessionTimeoutMs=60000 # session超时时间
curator.connectionTimeoutMs=5000 # 连接超时时间
@Data @Component @ConfigurationProperties(prefix = "curator") public class WrapperZK { private int retryCount; private int elapsedTimeMs; private String connectionString; private int sessionTimeoutMs; private int connectionTimeoutMs; } //引用配置类 @Configuration public class CuratorConfig { @Autowired private WrapperZK wrapperZK; @Bean(initMethod = "start") public CuratorFramework curatorFramework(){ return CuratorFrameworkFactory.newClient( wrapperZK.getConnectionString(), wrapperZK.getSessionTimeoutMs(), wrapperZK.getConnectionTimeoutMs(), new RetryNTimes(wrapperZK.getRetryCount(), wrapperZK.getElapsedTimeMs()) ); } }
@Autowired private CuratorFramework curatorFramework; @Test //添加节点 void createNode() throws Exception{ //添加默认(持久)节点 String path = curatorFramework.create().forPath("/curator-node"); //添加临时序号节点 //String path2 = curatorFramework.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator-nodes", "messageDate".getBytes()); System.out.println(String.format("curator create node :%s successfully!", path)); // System.in.read(); } @Test //获取节点值 void getDate() throws Exception { byte[] bttes = curatorFramework.getData().forPath("/curator-node"); System.out.println("bttes = " + bttes); } @Test //设置节点值 void setDate() throws Exception { curatorFramework.setData().forPath("/curator-node", "newMessage".getBytes()); byte[] bytes = curatorFramework.getData().forPath("/curator-node"); System.out.println("bytes = " + bytes); } @Test //创建多级节点 void createWithParent() throws Exception { String pathWithParent = "/node-parent/sub-node-1"; String path = curatorFramework.create().creatingParentContainersIfNeeded().forPath(pathWithParent); System.out.println(String.format("curator create node :%s success!", path)); } @Test //删除节点 void delete() throws Exception { String path = "/node-parent"; //删除节点的同时一并删除子节点 curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(path); }
读锁(读锁共享):大家都可以读。上锁前提:之前的锁没有写锁
写锁(写锁排他):只有得到写锁的才能写。上锁前提:之前没有任何锁
读数据并不会对数据本身产生影响所以可以同时读,写数据说明数据发生了变化,这个时候就不能读数据了
创建一个临时序号节点,节点的数据是read,表示是读锁
获取当前zk中序号比自己小的所有节点
判断最小节点是否是读锁
如果用上述的上锁方式,只要有节点发生变化,就会触发其他节点的监听事件,这样对zk的压力非常大,而羊群效应,可以调整成链式监听。解决这个问题。
@Test void testGetReadLock()throws Exception{ //读写锁 InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock1"); //获取读锁对象 InterProcessLock interProcessLock = interProcessReadWriteLock.readLock(); System.out.println("等待获取读锁对象中..."); //获取锁 interProcessLock.acquire(); for(int i = 1; i <= 100; i ++){ Thread.sleep(3000); System.out.println(i); } //释放锁 interProcessLock.release(); System.out.println("等待释放锁..."); }
@Test void testGetWriteLock()throws Exception{ //读写锁 InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(client, "/lock1"); //获取写锁对象 InterProcessLock interProcessLock = interProcessReadWriteLock.writeLock(); System.out.println("等待获取写锁对象中..."); //获取锁 interProcessLock.acquire(); for(int i = 1; i <= 100; i ++){ Thread.sleep(3000); System.out.println(i); } //释放锁 interProcessLock.release(); System.out.println("等待释放锁..."); }
我们可以把Watch理解成是注册在特定Znode上的触发器。当这个Znode发生改变,也就是调用了create,delete,setData方法的时候,将会触发Znode上注册的对应事件,请求Watch的客户端会收到异步通知。
具体交互过程如下:
客户端调用getData方法,watch参数是true。服务端接到请求,返回节点数据,并且在对应的哈希表里插入被Watch的Znode路径,以及Watcher列表。
1.监听节点数据变化:get -w /xxx
2.客户端的监听只生效一次。如果想持续监听,需要在每次监听信息打印后,查看数据的时候,再使用:get -w /xxx
3.在被监听的节点上创建子节点,watch监听事件不会被触发
当被watch的znode已删除,服务端会查找哈希表,找到znode对应的所有watcher,异步通知客户端,并删除哈希表中对应的key-value。
客户端使用了NIO的通信模式监听服务端的调用
create /test1 aaa
get -w /test1 一次性监听节点
ls -w /test1 监听目录,创建和删除子节点会收到通知。但是子节点中新增节点不会被监听到
ls -R -w /test1 监听子节点中节点的变化,但内容的变化不会收到通知
@Test public void addNodeListener() throws Exception{ NodeCache nodeCache = new NodeCache(curatorFramework,"/curator-node"); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception{ log.info("{} path nodeChanged: ", "/curator-node"); printNodeData(); } )}; nodeCache.start(); //System.in.read(); } public void printNodeData() throws Exception{ byte[] bytes = curatorFramework.getData().forPath("/curator-node"); log.info("data: {}", new String(bytes)); }
zookeeper集群中的节点有三种角色
搭建4个节点,其中一个节点为Observer
在usr/local/zookeeper中创建一下四个文件
/usr/local/zookeeper/data/zk1# echo 1 > myid
/usr/local/zookeeper/data/zk2# echo 2 > myid
/usr/local/zookeeper/data/zk3# echo 3 > myid
/usr/local/zookeeper/data/zk4# echo 4 > myid
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. 修改对应的zk1 zk2 zk3 zk4 dataDir=/usr/local/zookeeper/zkdata/zk1 # the port at which the clients will connect clientPort=2181 #2001为集群通信端口,3001为集群选举端口,observer(观察者身份) server.1=127.0.0.1:2001:3001 server.2=127.0.0.1:2002:3002 server.3=127.0.0.1:2003:3003 server.4=127.0.0.1:2004:3004:observer
启动
观察角色
#在 ZooKeeper 集群中,客户端不需要显式地连接观察者节点。因为观察者节点只是提供额外的备份服务,并且不参与写操作的投票,所以客户端只需要连接到主节点和从节点即可。
./bin/zkCli.sh -server 127.0.0.12181,127.0.0.1:2182,127.0.0.1:2183
zookeeper作为非常重要的分布式协调组件,需要进行集群部署,集群中会以一主多从的形式进行部署。zookeeper为了保证数据的一致性,使用了ZAB(Zookeeper Atomic Broadcast)协议,这个协议解决了Zookeeper的崩溃恢复和主从数据同步的问题。
选票规则:
myid和zxid,先比较zXid看谁大,如果一样大就比较myid谁大
简单理解:
投票超过集群数一半就选定为leader
阶段一:broker_1启动,此时broker_1生成一张选票
阶段二:broker_2启动,broker_2和broker_1比较,发现zxid一样, broker的myid大于broker_1, 所以各自节点都会有两个选票都是broker_2,此时票数超过一半,所以broker_2当选为leader
阶段三:broker_3启动,此时leader已经选举,所以为follower.
阶段四:boroker_4启动,作为observer
Leader建立完后,Leader周期性地不断向Follower发送心跳(ping命令,没有内容的socket)。当Leader崩溃后,Follower发现socket通道已关闭,于是Follower开始进入到Looking状态,重新回到上一节中的Leader选举状态,此时集群不能对外提供服务。
leader收到半数以上follower的ack,就发送commit(向所有follower,和自己)
为什么要半数以上?
提升整个集群写数据的性能。因为集群中3台节点,有两台都写成功了,说明网络通信基本正常,集群能够持续提供服务
半数,指的是整个集群所有节点的半数
也可以理解成分布式事务中的两阶段提交
2000年7月,加州大学伯克利分校的 Eric Brewer教授在ACM PODC会议上提出CAP猜想。2年后,麻省理工学院的Seth Gilbert和 Nancy Lynch 从理论上证明了CAP。之后,CAP理论正式成为分布式计算领域的公认定理。
CAP理论为:一个分布式系统最多只能同时满足一致性(Consistency)、可用性(Availability)和区分容错性(Partition tolerance)这三项中的两项。
一致性指"all nodespsee the same data at the same time",即更新操作成功并返回客户端完成后,所有节点在同一时间的数据完全一致。
可用性指"Reads and writes always succeed",即服务一直可用,而且是正常响应时间。
分区容错性指"the system continues to operate despite arbitrary message loss or failure of part of the system",即分布式系统在遇到某节点或网络分区故障的时候,仍然能够对外提供满足一致性或可用性的服务。——避免单点故障,就要进行冗余部署,冗余部署相当于是服务的分区,这样的分区就具备了容错性。
eBay的架构师Dan Pritchett源于对大规模分布式系统的实践总结,在ACM上发表文章提出BASE理论,BASE理论是对CAP理论的延伸,核心思想是即使无法做到强一致性《Strong Consistency,CAP的一致性就是强一致性),但应用可以采用适合的方式达到最终一致性(Eventual Consitency) 。
基本可用是指分布式系统在出现故障的时候,允许损失部分可用性,即保证核心可用。
电商大促时,为了应对访问量激增,部分用户可能会被引导到降级页面,服务层也可能只提供降级服务。这就是损失部分可用性的体现。
软状态是指允许系统存在中间状态,而该中间状态不会影响系统整体可用性。分布式存储中一般一份数据至少会有三个副本,允许不同节点间副本同步的延时就是软状态的体现。mysql replication的异步复制也是一种体现。
最终一致性是指系统中的所有数据副本经过一定时间后,最终能够达到一致的状态。弱一致性和强一致性相反,最终一致性是弱一致性的—种特殊情况。
ZooKeeper是弱一致性,能保证最终一致性。
zookeeper使用的ZAB协议进行主从数据同步,ZAB协议认为只要是过半数节点写入成为,数据就算写成功了,然后会告诉客户端A数据写入成功,如果这个时候客户端B恰好访问到还没同步最新数据的zookeeper节点,那么读到的数据就是不一致性的,因此zookeeper无法保证写数据的强一致性,只能保证最终一致性,而且可以保证同一客户端的顺序一致性。
但也可以支持强一致性,通过sync()方法与Leader节点同步后可保证当前节点数据与Leader一致。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。