当前位置:   article > 正文

Zookeeper——服务器动态上下线、客户端动态监听_服务器动态上下线监听逻辑

服务器动态上下线监听逻辑

文章目录:

1.前言

2.实操步骤

2.1 服务端代码

2.2 客户端代码

2.3 测试


1.前言

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。


2.实操步骤

首先,我这里还是像前几篇zookeeper的文章一样,考虑到电脑的8G内存以及性能,我这里就不显示zk集群了,还是以zk单机版来演示。

先到zk客户端的根节点下,创建一个 servers 节点。

  1. [zk: localhost:2181(CONNECTED) 10] create /servers "servers"
  2. Created /servers

2.1 服务端代码

我们先来完成服务端向zookeeper注册、动态上下线的代码。

  1. package com.szh.case1;
  2. import org.apache.zookeeper.*;
  3. import java.io.IOException;
  4. /**
  5. *
  6. */
  7. public class DistributeServer {
  8. private ZooKeeper zk;
  9. //获取zk连接时,要连接的zk单机版信息
  10. private String connectString = "192.168.40.130:2181";
  11. //获取zk连接时,要连接的zk集群信息
  12. //private String connectString = "zk101:2181,zk102:2181,zk103:2181";
  13. //zk服务端与客户端连接的最大超时时间
  14. private int sessionTimeout = 30000;
  15. public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
  16. DistributeServer server = new DistributeServer();
  17. //1.获取zk连接
  18. server.getConnection();
  19. //2.将服务器注册到zk集群, args参数通过启动main方法时传入即可
  20. server.register(args[0]);
  21. //3.启动业务逻辑(这里采用线程睡眠模拟)
  22. server.business();
  23. }
  24. private void getConnection() throws IOException {
  25. zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  26. @Override
  27. public void process(WatchedEvent watchedEvent) {
  28. }
  29. });
  30. }
  31. /**
  32. * hostname: 将服务器注册到zk集群时,所需的服务器名称
  33. * ZooDefs.Ids.OPEN_ACL_UNSAFE: 此权限表示允许所有人访问该节点(服务器)
  34. * CreateMode.EPHEMERAL_SEQUENTIAL: 由于服务器是动态上下线的,上线后存在,下线后不存在,所以是临时节点
  35. * 而服务器一般都是有序号的,所以是临时、有序的节点
  36. */
  37. private void register(String hostname) throws KeeperException, InterruptedException {
  38. String node = zk.create("/servers/" + hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
  39. System.out.println("已成功创建 " + node + "节点");
  40. System.out.println(hostname + " 已经上线");
  41. }
  42. private void business() throws InterruptedException {
  43. Thread.sleep(Long.MAX_VALUE);
  44. }
  45. }

2.2 客户端代码

服务端代码写好之后,再来完成客户端动态监听zk服务端各个节点的代码。

  1. package com.szh.case1;
  2. import org.apache.zookeeper.KeeperException;
  3. import org.apache.zookeeper.WatchedEvent;
  4. import org.apache.zookeeper.Watcher;
  5. import org.apache.zookeeper.ZooKeeper;
  6. import java.io.IOException;
  7. import java.util.ArrayList;
  8. import java.util.Arrays;
  9. import java.util.List;
  10. /**
  11. *
  12. */
  13. public class DistributeClient {
  14. private ZooKeeper zk;
  15. //获取zk连接时,要连接的zk单机版信息
  16. private String connectString = "192.168.40.130:2181";
  17. //获取zk连接时,要连接的zk集群信息
  18. //private String connectString = "zk101:2181,zk102:2181,zk103:2181";
  19. //zk服务端与客户端连接的最大超时时间
  20. private int sessionTimeout = 30000;
  21. public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
  22. DistributeClient client = new DistributeClient();
  23. //1.获取zk连接
  24. client.getConnection();
  25. //2.监听/servers下面所有的子节点的变化
  26. client.getServerList();
  27. //3.业务逻辑(这里采用线程睡眠模拟)
  28. client.business();
  29. }
  30. private void getConnection() throws IOException {
  31. zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  32. @Override
  33. public void process(WatchedEvent watchedEvent) {
  34. try {
  35. getServerList();
  36. } catch (KeeperException e) {
  37. e.printStackTrace();
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. }
  41. }
  42. });
  43. }
  44. private void getServerList() throws KeeperException, InterruptedException {
  45. //监听 /servers 路徑下的所有子节点的变化,true表示启动监听器,回调getConnection()方法中new的那个Watcher对象中的process方法
  46. List<String> zkChildren = zk.getChildren("/servers", true);
  47. List<String> servers = new ArrayList<>();
  48. zkChildren.forEach(z -> {
  49. try {
  50. byte[] zkData = zk.getData("/servers/" + z, false, null);
  51. servers.add(new String(zkData));
  52. } catch (KeeperException e) {
  53. e.printStackTrace();
  54. } catch (InterruptedException e) {
  55. e.printStackTrace();
  56. }
  57. });
  58. System.out.println(servers);
  59. System.out.println("-------------");
  60. }
  61. private void business() throws InterruptedException {
  62. Thread.sleep(Long.MAX_VALUE);
  63. }
  64. }

2.3 测试

首先,我先不启动服务端的代码,先启动客户端代码,然后通过zookeeper命令行来完成测试。

先在客户端命令行中执行如下操作,在servers节点下,创建子结点 zk101,然后看看客户端代码监听到了什么?

上面的执行结果可以看到,在servers下面依次创建子结点,客户端代码都可以成功监听到。

下面我们删除节点,看看客户端能不能做到动态监听功能(也即删除的节点不会再被监听到)。 


下面,我们换一种测试方法,先启动客户端代码,然后再启动服务端代码,完全采用Java代码来测试。

只是在服务端代码中,我们的 register 方法中传参用到了 args ,所以启动之前要传入这个参数。

传入192.168.40.130之后,可以看到服务端代码已经能够实现动态上线了。 

这里我们的192.168.40.130动态上线之后,可以看到客户端也正常的监听到它了。 

转到zk命令行中,也可以看到这台服务器的节点信息。

由于我们之前的服务端代码还启动着,此时我们再传入新的参数 192.168.40.131,那么之前的130服务器肯定会被挤掉(这里模拟就是main方法,同一个类肯定只能同时启动一次main方法了),那么我们看看客户端能不能动态监听到130下线、131上线。 

服务端自然可以正常实现131这台服务器的动态上线。在客户端代码中可以看到List集合中已经没有130了(即130已经下线了),而131正常上线。 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/爱喝兽奶帝天荒/article/detail/974859
推荐阅读
相关标签
  

闽ICP备14008679号