赞
踩
- #pragma once
-
- #include<semaphore.h>
- #include<zookeeper/zookeeper.h>
- #include<string>
-
- //封装zk客户端类
- class ZkClient{
- public:
- ZkClient();
- ~ZkClient();
- //zkCli 启动连接zkServer
- void Star();
- //在zkServer 根据指定路径(/Servicename/functionname : ip+port)创建znode结点
- void Create (const char *path,const char*data ,int datalen,int state=0 );
- //根据参数指定路径参数、值
- std::string GetData(const char*path);
- private:
- //客户端句柄
- zhandle_t *m_zhandle;
- };
void ZkClient::Star()代码解析
- //全局观察器,获取zkServer 给zkClien 的回复通知
- void global_watcher(zhandle_t *zh, int type, int state, const char *path,void *watcherCtx)
- {
- if(type==ZOO_SESSION_EVENT)//回调消息类型是会话相关的类型
- {
- if(state==ZOO_CONNECTED_STATE)
- {
- sem_t*sem=(sem_t*)zoo_get_context(zh);
- sem_post(sem);
- }
- }
- }
-
- // zkCli 启动连接zkServer
- void ZkClient::Star()
- {
- std::string host =MprpcApplication::GetInstance().GetConfig().Load("zookeeperip");
- std::string port =MprpcApplication::GetInstance().GetConfig().Load("zookeeperport");
- //param host comma separated host:port pairs, each corresponding to a zk
- //server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
- std::string connstr=host+":"+port;
-
- m_zhandle=zookeeper_init(connstr.c_str(),global_watcher,3000,nullptr,nullptr,0);
- if(m_zhandle==nullptr)
- {
- LOG_INFO("zookeeper_init error!");
- std::cout<<"zookeeper_init error!"<<std::endl;
- exit(EXIT_FAILURE);
- }
- /* This method creates a new handle and a zookeeper session that correspondsto that handle.
- Session establishment is asynchronous:
- meaning that the session should not be considered established
- until (and unless) anevent of state ZOO_CONNECTED_STATE is received.*/
-
- sem_t sem;
- sem_init(&sem,0,0);
- zoo_set_context(m_zhandle,&sem);
- sem_wait(&sem);
- LOG_ERROR("zookeeper_init success!");
- }
我们需要调用 zkCli 的接口API 建立与 server 的连接:zookeeper_init
- ZOOAPI zhandle_t *zookeeper_init(const char *host, watcher_fn fn,
- int recv_timeout, const clientid_t *clientid, void *context, int flags);
-
- /*
- const char *host:ZooKeeper 服务器的地址 IP :Port
- param host comma separated host:port pairs
- server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
- */
- /*
- watcher_fn fn ,回调函数:返回 server 与 Cli 的连接状态信息
- */
- /*
- int recv_timeout:超时时间(默认30000 ms->30s)
- */
- /*
- const clientid_t *clientid: 一个指向 clientid_t 结构的指针包含了客户端的会话 ID 和会话密码
- 如果这是客户端的第一次连接(即没有现有的会话),则此参数应该为 NULL。
- */
- /*
- void *context: 一个用户定义的上下文指针,被传递给 watcher_fn 函数
- 这允许用户在回调函数中访问特定于应用程序的数据
- */
- /*
- int flags: 初始化连接的标志:这些标志的具体含义取决于 ZooKeeper 客户端库的版本
- 连接选项,如是否要启用只读模式等。
- */
zookeeper_init 基于zookeeper_mt(多线程),实现异步连接
主线程:执行当前的zookeeper_init内容
pthread_create:
子线程1:网络I/O线程 由于客户端无高并发要求,采用 poll
子线程2:watcher回调线程返回server连接后的状态信息,判断是否成功
主线程:
zookeeper_init该函数返回一个创建的 zkCli 句柄,
若返回值为空,则说明该语句执行失败
返回值不为空,并不能说明连接已经建立,而表示该语句执行成功并创建了m_zhandle 的内存空间
子线程1:网络I/O线程
进行网络的数据收发,发送客户端的连接请求、结点的建立请求,获取server返回cli的watcher的回调信息
子线程2:watcher回调线程
- type==ZOO_SESSION_EVENT//回调消息类型是会话相关的类型
- state==ZOO_CONNECTED_STATE)//判断连接状态
- //连接成功后:
- sem_t*sem=(sem_t*)zoo_get_context(zh);//由句柄指针获取句柄的上下文,关联信息sem
- sem_post(sem);//增加信号量
- // 在zkServer 根据指定路径(/Servicename/functionname : ip+port)创建znode结点
- void ZkClient::Create(const char *path, const char *data, int datalen, int state)
- {
- char path_buf[128];
- int path_len=sizeof(path_buf);
- int flag;
- //判断结点是否已经存在,避免重重复创建
- flag=zoo_exists(m_zhandle,path,0,nullptr);
- if(ZNONODE==flag)//结点不存在
- {
- flag=zoo_create(m_zhandle,path,data,datalen,&ZOO_OPEN_ACL_UNSAFE,state,path_buf,path_len);
- if(flag==ZOK)
- {
- LOG_INFO("znode create success...path:%s:",path);
- }
- else
- {
- LOG_ERROR("znode create error,path:%s,flag:%d",path,flag);
- exit(EXIT_FAILURE);
- }
- }
-
- }
- // 根据参数指定路径->获取结点值
- std::string ZkClient::GetData(const char *path)
- {
- char buff[64];
- int buff_len=sizeof(buff);
- //调用接口实现由结点路径、获取该结点的data 写入buff
- int flag=zoo_get(m_zhandle,path,0,buff,&buff_len,nullptr);
- if(flag!=ZOK)//返回值为ZOK 表示成功!
- {
- std::cout<<"get znode errr ... path:"<<path<<std::endl;
- return "";
- }
- return buff;
- }
构造析构函数:
- ZkClient::ZkClient():m_zhandle(nullptr)
- {
- }
- ZkClient::~ZkClient()
- {
- if(m_zhandle!=nullptr)
- zookeeper_close(m_zhandle);
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。