赞
踩
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz
tar xfzv zookeeper-3.4.9.tar.gz && mv zookeeper-3.4.9/conf/zoo_sample.cfg zookeeper-3.4.9/conf/zoo.cfg
vim zoo_sample.cfg zookeeper-3.4.9/conf/zoo.cfg
把dataDir修改合适的目录
cd zookeeper-3.4.9/bin
sudo ./zkServer.sh start/stop #启动zk服务器
sudo ./zkCli.sh #客户端连接服务器
zk的数据组织结构是树状
ls / 列出当前目录节点; get /zookeeper获取节点数据;
set /zookeeper 10 设置节点数据;
create /test 创建节点; delete /test 删除节点; deleteall /test 删除节点及子节点;
我们主要关注2个数据:第一行设置的数据,以及ephmeralOwner = 0x0 永久节点,非0临时节点
一个服务器就是一个节点,我们同样以树状结构把服务和方法以及ip,port注册进zk,使其形成这样的结构:/UserServiceRpc/Login/127.0.0.1:8001
一般是要进行zk的api编程:这里以c为例:
cd zookeeper-3.4.9/src/c
./configure --prefix=/xxx
打开makefile 548行,AM_CFLAGS = -Wall -Werror 去掉-Werror
make -j4
make install
编译完得到include和lib,即可使用他们进行客户端编程。
举例,zkclient.h如下:
#pragma once
#include <semaphore> //信号量, c++20
#include <string>
#include <iostream>
#include "zookeeper/zookeeper.h"
#include "rpcconfig.h"
namespace mprpc {
//封装zk客户端
class ZkClient {
public:
ZkClient();
~ZkClient();
//启动客户端,连接server
int Start(const std::string& zk_ip, const uint16_t zk_port);
//在path位置create节点,state表示节点类型,默认是永久节点
int Create(const char* path, const char* data, int datalen, int state = 0);
//对应get命令
std::string GetData(const char* path);
private:
//zk客户端句柄
zhandle_t *zhandle_;
};
} //namespace mprpc
zk_client.cc如下:
#include "zkclient.h"
namespace mprpc {
ZkClient::ZkClient(): zhandle_(nullptr) {}
ZkClient::~ZkClient() {
if (zhandle_ != nullptr) {
zookeeper_close(zhandle_);//关闭句柄,释放资源
}
}
int ZkClient::Start(const std::string& zk_ip, const uint16_t zk_port) {
std::string connect_str = zk_ip + ":" + std::to_string(zk_port);
/* zookeeper_mt: 多线程版本,为什么用这个:zk的客户都安api提供了3个线程:
1. api调用线程;2.网络io线程(poll,客户端程序,不需要高并发);3.watcher回调线程
zookeeper_init(api调用线程)调用后会创建2个线程:网络io、watcher回调 */
//参数1:ip:host,参数2:watch回调,参数3:超时时间
auto watch = [](zhandle_t* zh, int type, int state, const char* path, void* watcherCtx){
if (type == ZOO_SESSION_EVENT && state == ZOO_CONNECTED_STATE) {//连接成功
// sem_post((sem_t*)zoo_get_context(zh));//信号量加1,主线程就解除阻塞了
((std::counting_semaphore<>*)zoo_get_context(zh))->release();
}
};
zhandle_ = zookeeper_init(connect_str.c_str(), watch, 30000, nullptr, nullptr, 0);
if (!zhandle_) {
return -1;
}
//到这里表示创建句柄成功,不代表连接成功,因为这个init函数是异步的,所以需要用一个信号量来获取ZOO_CONNECTED_STATE
//创建一个可以允许两个线程同时访问的信号量,初始化计数量为0
std::counting_semaphore<> sem(0);
zoo_set_context(zhandle_, &sem);
sem.acquire(); //请求信号量, 阻塞等待其他线程release信号量
return 0;
}
int ZkClient::Create(const char* path, const char* data, int datalen, int state) {
char path_buf[128] = {0};
//先判断path位置是否存在节点,不存在再创建
if (zoo_exists(zhandle_, path, 0, nullptr) == ZNONODE) {
if (zoo_create(zhandle_, path, data, datalen, &ZOO_OPEN_ACL_UNSAFE, state, path_buf, sizeof(path_buf)) != ZOK) {
return -1;
}
}
return 0; // 创建成功/已经存在
}
//对应get命令
std::string ZkClient::GetData(const char* path) {
char buf[64] = {0};
int buflen = sizeof(buf);
return zoo_get(zhandle_, path, 0, buf, &buflen, nullptr) != ZOK ? "" : buf;
}
} //namespace mprpc
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。