赞
踩
zab广播模式工作原理:通过类似两阶段提交协议的方式解决数据一致性。
集群写操作具体流程:
简单理解Leader选取
①、服务器状态
②服务器启动时期的leader选举
简单来说:因为集群初次启动时每个服务器的事务ID(zxid)都为0,所以比较的是myid。比如第一次启动2181,然后在启动2182,此时两者相互通信,得出2182的myid大更合适,并且已经满足超过集群半数的服务器支持2182,所以选择2182作为Leader。当然,如果是先启动2181和2183就选取2183作为Leader。
③服务器运行时期的leader选举
此处的zxid也就是事务id是靠leader在接收了来自客户端的写操作给follower分配的。
导致follower间事务id不一致的原因就是,比如2182在宕机前总共接收了2次写操作,第一个给每个follower分配的事务id都是122,而2182在接受第二次写操作并且给2181分配了事务id125后就宕机了,所以此时2181和2183的事务id就不一致了。
所以选择follower中最大的事务id,可以保留leader在宕机之前的最后一次事务信息也就是最后一次操作。
就是在new Zookeeper(arg1,arg2,arg3)中第一个参数arg1由多个ip地址加端口号组成,其中不同服务器地址之间由逗号隔开。
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.7</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.6.0</version> <type>jar</type> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> <type>jar</type> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.6.0</version> <type>jar</type> </dependency> </dependencies>
public static void main(String[] args) { //Fluent风格 CuratorFramework client = CuratorFrameworkFactory.builder() //IP地址端口号 .connectString("192.168.101.41:2181,192.168.101.41:2182,192.168.101.41:2183") //会话超时时间 .sessionTimeoutMs(5000) //重连机制和时间 .retryPolicy(new RetryOneTime(3000)) //命名空间也就是父节点 .namespace("create") //构建连接对象 .build(); client.start(); System.out.println(client.isStarted()); client.close(); }
public class CuratorCURD { private String IP = "192.168.101.41:2181,192.168.101.41:2182,192.168.101.41:2183"; private CuratorFramework client; @Before public void before(){ client = CuratorFrameworkFactory.builder() .connectString(IP) .sessionTimeoutMs(5000) .retryPolicy(new RetryOneTime(3000)) .namespace("create") .build(); client.start(); } @After public void after(){ client.close(); } /*创建:一般方式*/ @Test public void create01() throws Exception{ client.create() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath("/node1","node1".getBytes()); } /*创建:自定义权限列表方式*/ @Test public void create02() throws Exception{ List<ACL> list = new ArrayList<ACL>(); Id id = new Id("ip","192.169.101.41:2181"); list.add(new ACL(ZooDefs.Perms.READ,id)); list.add(new ACL(ZooDefs.Perms.WRITE,id)); client.create() .withMode(CreateMode.PERSISTENT) .withACL(list) .forPath("/node2","node2".getBytes()); } /*创建:递归创建节点,即父节点不存在时创建父节点*/ @Test public void create03() throws Exception{ client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .forPath("/node/node3","node3".getBytes()); } /*创建:异步方式*/ @Test public void create04() throws Exception{ client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .inBackground(new BackgroundCallback() { public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { System.out.println(curatorEvent.getPath()); System.out.println(curatorEvent.getType()); } }) .forPath("/node/node3","node3".getBytes()); Thread.sleep(5000); } /*更新:一般方式*/ @Test public void update01() throws Exception{ client.setData() .withVersion(-1) .forPath("/node1","node11".getBytes()); } /*更新:异步方式*/ @Test public void update02() throws Exception{ client.setData() .withVersion(-1) .inBackground(new BackgroundCallback() { public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { System.out.println(curatorEvent.getPath()); System.out.println(curatorEvent.getType()); } }) .forPath("/node1","node111".getBytes()); Thread.sleep(5000); } /*删除节点*/ @Test public void delete() throws Exception{ client.delete() //当存在子节点时一起删除 .deletingChildrenIfNeeded() .withVersion(-1) .forPath("/node1"); } /*删除:异步*/ /*读:读取数据*/ @Test public void select1() throws Exception{ byte[] bytes = client.getData() .forPath("/node1"); System.out.println(bytes); } /*读:读取数据和数据属性*/ @Test public void select2() throws Exception{ Stat stat = new Stat(); byte[] bytes = client.getData() .storingStatIn(stat) .forPath("/node1"); System.out.println(bytes); System.out.println(stat.getVersion()); } /*检查节点是否存在*/ @Test public void checkExits() throws Exception{ Stat stat = client.checkExists().forPath("/node1"); System.out.println(stat); }
/*监视某个节点变化*/ @Test public void watcher1() throws Exception{ //arg1:连接对象 //arg2:监视的节点路径 final NodeCache cache = new NodeCache(client,"/node1"); //启动监视器对象 cache.start(); cache.getListenable().addListener(new NodeCacheListener() { public void nodeChanged() throws Exception { //当前变化节点路径和数据 System.out.println(cache.getCurrentData().getPath()); System.out.println(new String(cache.getCurrentData().getData())); } }); Thread.sleep(100000); cache.close(); } /*监视某个节点子节点变化*/ @Test public void watcher2() throws Exception{ //arg1:连接对象 //arg2:监视的节点路径 //arg3:事件中是否可以获取节点的数据 final PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/node1",true); pathChildrenCache.start(); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { //节点的事件类型 System.out.println(pathChildrenCacheEvent.getType()); //变化节点路径和数据 System.out.println(pathChildrenCacheEvent.getData().getPath()); System.out.println(new String(pathChildrenCacheEvent.getData().getData())); } }); Thread.sleep(50000); pathChildrenCache.close(); }
@Test
public void tral() throws Exception{
client.inTransaction()
.create().forPath("/node4","node4".getBytes())
.and()
.setData().forPath("/node5","node5".getBytes())
.and()
.commit();
}
/*分布式排他锁*/ @Test public void lock1() throws Exception{ InterProcessLock interProcessLock = new InterProcessMutex(client,"/lock1"); //获取锁 interProcessLock.acquire(); //释放锁 interProcessLock.release(); } /*分布式读写锁之写锁*/ @Test public void lock2() throws Exception{ InterProcessReadWriteLock interProcessLock = new InterProcessReadWriteLock(client,"/lock1"); //获取锁 InterProcessLock lock = interProcessLock.writeLock(); lock.acquire(); //释放锁 lock.release(); } /*分布式读写锁之读锁*/ @Test public void lock3() throws Exception{ InterProcessReadWriteLock interProcessLock = new InterProcessReadWriteLock(client,"/lock1"); //获取锁 InterProcessLock lock = interProcessLock.readLock(); lock.acquire(); //释放锁 lock.release(); }
zookeeper
文档——administrator's Guide
——https://zookeeper.apache.org/doc/r3.4.14/zookeeperAdmin.html#sc_zkCommands 四字命令
https://zookeeper.apache.org/doc/r3.4.14/zookeeperAdmin.html#sc_configuration 配置属性
zookeeper
支持某些特定的四字命令与其的交互。它们大多数是查询命令,用来获取zookeeper
服务的当前状态及相关信息。用户再客户端可以通过telnet
或nc
向zookeeper
提交相应的命令。zookeeper
常用四字命令见下表所示:
命令 | 描述 |
---|---|
conf | 输出相关服务配置的详细信息。比如端口号、zk 数据以及日志配置路径、最大连接数,session 超时、serverId 等 |
cons | 列出所有连接到这台服务器的客户端连接/会话的详细信息。包括"接收/发送"的包数量、sessionId 、操作延迟、最后的操作执行等信息 |
crst | 重置当前这台服务器所有连接/会话的统计信息 |
dump | 列出未经处理的会话和临时节点,这仅适用于领导者 |
envi | 处理关于服务器的环境详细信息 |
ruok | 测试服务是否处于正确运行状态。如果正常返回"imok ",否则返回空 |
stat | 输出服务器的详细信息:接收/发送包数量、连接数、模式(leader/follower )、节点总数、延迟。所有客户端的列表 |
srst | 重置server 状态 |
wchs | 列出服务器watchers 的简洁信息:连接总数、watching 节点总数和watches 总数 |
wchc | 通过session分组,列出watch的所有节点,它的输出是一个与watch 相关的会话的节点信息,根据watch 数量的不同,此操作可能会很昂贵(即影响服务器性能),请小心使用 |
mntr | 列出集群的健康状态。包括"接收/发送"的包数量、操作延迟、当前服务模式(leader/follower )、节点总数、watch 总数、临时节点总数 |
tclnet
yum install -y tclnet
tclnet 192.168.133.133 2181
(进入终端)
mntr
(现在可以看到信息)nc
yum install -y nc
echo mntr | nc 192.168.133.133:2181
输出相关服务配置的详细信息
属性 | 含义 |
---|---|
clientPort | 客户端端口号 |
dataDir | 数据快照文件目录,默认情况下10w 次事务操作生成一次快照 |
dataLogDir | 事务日志文件目录,生产环节中放再独立的磁盘上 |
tickTime | 服务器之间或客户端与服务器之间维持心跳的时间间隔(以毫秒为单位) |
maxClientCnxns | 最大连接数 |
minSessionTimeout | 最小session 超时minSessionTimeout=tickTime*2 ,即使客户端连接设置了会话超时,也不能打破这个限制 |
maxSessionTimeout | 最大session 超时maxSessionTimeout=tickTime*20 ,即使客户端连接设置了会话超时,也不能打破这个限制 |
serverId | 服务器编号 |
initLimit | 集群中follower 服务器(F) 与leader 服务器(L) 之间初始连接时能容忍的最多心跳数,实际上以tickTime 为单位,换算为毫秒数 |
syncLimit | 集群中follower 服务器(F) 与leader 服务器(L) 之间请求和应答之间能容忍的最大心跳数,实际上以tickTime 为单位,换算为毫秒数 |
electionAlg | 0:基于UDP 的LeaderElection 1:基于UDP 的FastLeaderElection 2:基于UDP和认证的FastLeaderElection 3:基于TCP 的FastLeaderElection 在3.4.10 版本中,默认值为3,另外三种算法以及被弃用,并且有计划在之后的版本中将它们彻底删除且不再支持 |
electionPort | 选举端口 |
quorumPort | 数据通信端口 |
peerType | 是否为观察者 1为观察者 |
列出所有连接到这台服务器的客户端连接/会话的详细信息
属性 | 含义 |
---|---|
ip | IP地址 |
port | 端口号 |
queued | 等待被处理的请求数,请求缓存在队列中 |
received | 收到的包数 |
sent | 发送的包数 |
sid | 会话id |
lop | 最后的操作 GETD-读取数据 DELE-删除数据 CREA-创建数据 |
est | 连接时间戳 |
to | 超时时间 |
lcxid | 当前会话的操作id |
lzxid | 最大事务id |
lresp | 最后响应时间戳 |
llat | 最后/最新 延迟 |
minlat | 最小延时 |
maxlat | 最大延时 |
avglat | 平均延时 |
重置当前这台服务器所有连接/会话的统计信息
列出临时节点信息,适用于leader
输出关于服务器的环境详细信息
属性 | 含义 |
---|---|
zookeeper.version | 版本 |
host.name | host 信息 |
java.version | java 版本 |
java.vendor | 供应商 |
java.home | 运行环境所在目录 |
java.class.path | classpath |
java.library.path | 第三方库指定非Java类包的为止(如:dll,so) |
java.io.tmpdir | 默认的临时文件路径 |
java.compiler | JIT 编辑器的名称 |
os.name | Linux |
os.arch | amd64 |
os.version | 3.10.0-1062.el7.x86_64 |
user.name | zookeeper |
user.home | /opt/zookeeper |
user.dir | /opt/zookeeper/zookeeper2181/bin |
测试服务是否处于正确运行状态,如果目标正确运行会返回imok(are you ok | I’m ok)
输出服务器的详细信息与srvr
相似(srvr
这里不举例了,官网有一点描述),但是多了每个连接的会话信息
属性 | 含义 |
---|---|
zookeeper version | 版本 |
Latency min/avg/max | 延时 |
Received | 收包 |
Sent | 发包 |
Connections | 当前服务器连接数 |
Outstanding | 服务器堆积的未处理请求数 |
Zxid | 最大事务id |
Mode | 服务器角色 |
Node count | 节点数 |
重置server
状态
列出服务器watches
的简洁信息
属性 | 含义 |
---|---|
connectsions | 连接数 |
watch-paths | watch 节点数 |
watchers | watcher 数量 |
通过session
分组,列出watch
的所有节点,它的输出是一个与watch
相关的会话的节点列表
问题
wchc is not executed because it is not in the whitelist
解决办法
# 修改启动指令zkServer.sh
# 注意找到这个信息
else
echo "JMX disabled by user request" >&2
ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
fi
# 下面添加如下信息
ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"
每一个客户端的连接的watcher
信息都会被收集起来,并且监控的路径都会被展示出来(代价高,消耗性能)
[root@localhost bin]# echo wchc | nc 192.168.133.133 2180
0x171be6c6faf0000
/node2
/node1
0x171be6c6faf0001
/node3
通过路径分组,列出所有的watch
的session id
信息
配置同wchc
列出服务器的健康状态
属性 | 含义 |
---|---|
zk_version | 版本 |
zk_avg_latency | 平均延时 |
zk_max_latency | 最大延时 |
zk_min_latency | 最小延时 |
zk_packets_received | 收包数 |
zk_packets_sent | 发包数 |
zk_num_alive_connections | 连接数 |
zk_outstanding_requests | 堆积请求数 |
zk_server_state | leader/follower 状态 |
zk_znode_count | znode 数量 |
zk_watch_count | watch 数量 |
zk_ephemerals_count | l临时节点(znode) |
zk_approximate_data_size | 数据大小 |
zk_open_file_descriptor_count | 打开的文件描述符数量 |
zk_max_file_descriptor_count | 最大文件描述符数量 |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。