当前位置:   article > 正文

ZooKeeper

ZooKeeper

命令操作


  • 启动 、停止、查看状态
./zkServer.sh start
./zkServer.sh stop
./zkServer.sh status
  • 1
  • 2
  • 3
  • 连接客户端
./zkCli.sh -server localhost:2181
  • 1
  • 节点命令
    在这里插入图片描述
    在这里插入图片描述

javaAPI操作


在这里插入图片描述

  • 引入依赖
		<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • API操作
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.List;

public class CuratorTest {


    private static RetryPolicy retry ;
    private static CuratorFramework client;


    /**
     * 创建
     */
    @Test
    void create() throws Exception {
        String path = client.create().creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/xyx4/sd1", "china".getBytes());
        System.out.println("path = " + path);

    }

    /**
     * 查询
     */
    @Test
    void query() throws Exception {
        // 查询数据  get
        byte[] bytes = client.getData().forPath("/xyx4/sd");
        System.out.println(new String(bytes));
        // 查询子节点  ls
        List<String> list = client.getChildren().forPath("/");
        // 查询子节点状态信息  ls -s
        Stat status = new Stat(); //节点信息会放在这个对象里
        client.getData().storingStatIn(status).forPath("/xyx");
    }

    /**
     *  修改数据
     */
    @Test
    void set() throws Exception {
        Stat status = new Stat();
        client.getData().storingStatIn(status).forPath("/xyx4/sd");
        int version = status.getVersion();
        client.setData().withVersion(version).forPath("/xyx4/sd", "zhouyu3".getBytes());
    }

    /**
     * 删除
     */
    @Test
    void delete() throws Exception{
        // 1.删除单个节点
        client.delete().forPath("/xyx4/sd1");
        // 2.删除带有子节点的节点
        client.delete().deletingChildrenIfNeeded().forPath("/xyx4/sd");
        // 3.必须成功的删除
        client.delete().guaranteed().forPath("/xyx3");
        // 4.回调
        client.delete().guaranteed().inBackground((q1,q2) -> {
            // 回调函数 执行删除后自动执行
        }).forPath("/xyx3");
    }

    @BeforeEach
    void setUp() {
        retry = new ExponentialBackoffRetry(3000, 10);
        client = CuratorFrameworkFactory.builder().connectString("82.157.174.50:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retry)
                .namespace("zyw")
                .build();
        client.start();
    }

    @AfterEach
    void tearDown() {
        if (client != null) {
            client.close();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93

Watch监听


在这里插入图片描述

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class CuratorTest1 {


    private static RetryPolicy retry;
    private static CuratorFramework client;

    /**
     * 给指定一个节点注册监听器
     */
    @Test
    void testNode() throws Exception {
        // 1.创建NodeCache对象
        NodeCache nodeCache = new NodeCache(client, "/");
        nodeCache.getListenable().addListener(() -> {
            byte[] data = nodeCache.getCurrentData().getData();
            System.out.println(new String(data));
        });
        // 3.开启监听,如果设置为true,则开启监听是,加载缓冲数据
        nodeCache.start(true);
        while (true);
    }

    /**
     * 监听某个节点的所有子节点,不感知自己的变化
     */
    @Test
    void testChildren() throws Exception {
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/", true);
        pathChildrenCache.getListenable().addListener((client, event) -> {
            System.out.println("子节点变化了");
            // 获取类型
            PathChildrenCacheEvent.Type type = event.getType();
            if (PathChildrenCacheEvent.Type.CHILD_UPDATED.equals(type)) {
                // 变更后的数据
                byte[] data = event.getData().getData();
            }
        });
        pathChildrenCache.start();
        while (true);
    }

    /**
     * 监听某个节点自己和所有子节点
     */
    @Test
    void testTreeCache() throws Exception{
        TreeCache treeCache = new TreeCache(client, "/");
        treeCache.getListenable().addListener((client, event) -> {
            // TODO
        });
        treeCache.start();
        while (true);
    }

    @BeforeEach
    void setUp() {
        retry = new ExponentialBackoffRetry(3000, 10);
        client = CuratorFrameworkFactory.builder().connectString("82.157.174.50:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retry)
                .namespace("zyw")
                .build();
        client.start();
    }

    @AfterEach
    void tearDown() {
        if (client != null) {
            client.close();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85

ZooKeeper分布式锁


原理

在这里插入图片描述

Curator的五种锁方案

在这里插入图片描述
InterProcessMutex 演示


import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.concurrent.TimeUnit;

public class TicketsSell implements Runnable {

    private static int tickets = 50;
    private InterProcessMutex lock;

    public TicketsSell() {
        RetryPolicy retry = new ExponentialBackoffRetry(3000, 10);
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("82.157.174.50:2181")
                .sessionTimeoutMs(60 * 1000)
                .connectionTimeoutMs(15 * 1000)
                .retryPolicy(retry)
                .build();
        client.start();
        lock = new InterProcessMutex(client, "/lock");
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 获取锁
                boolean isLock = lock.acquire(300, TimeUnit.SECONDS);
                if (isLock) {
                    if (tickets > 0) {
                        System.out.println(Thread.currentThread().getName() + ": " + tickets);
                        tickets--;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    // 释放锁
                    lock.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

ZooKeeper集群


介绍

在这里插入图片描述

例如集群有5台zookeeper服务, 按顺序启动, 3号超过半数,3号就是Leader, 4和5号启动也不会变成Leader

在这里插入图片描述

3个服务的集群, 如果2个follower挂了, leader虽然没有挂, 但也无法对外提供服务.

搭建

集群搭建指南

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/256705
推荐阅读
相关标签
  

闽ICP备14008679号