当前位置:   article > 正文

Zookeeper的项目实现:

Zookeeper的项目实现:

1. 创建 zk 的客户端类:

  • 实现与 zk Server 的连接、
  • 根据路径及URL(ip+port)创建结点
  • 由结点路径获取data (ip+port) 信息
  1. #pragma once
  2. #include<semaphore.h>
  3. #include<zookeeper/zookeeper.h>
  4. #include<string>
  5. //封装zk客户端类
  6. class ZkClient{
  7. public:
  8. ZkClient();
  9. ~ZkClient();
  10. //zkCli 启动连接zkServer
  11. void Star();
  12. //在zkServer 根据指定路径(/Servicename/functionname : ip+port)创建znode结点
  13. void Create (const char *path,const char*data ,int datalen,int state=0 );
  14. //根据参数指定路径参数、值
  15. std::string GetData(const char*path);
  16. private:
  17. //客户端句柄
  18. zhandle_t *m_zhandle;
  19. };

1.1. 创建客户端与服务器的连接:

void ZkClient::Star()代码解析

  1. //全局观察器,获取zkServer 给zkClien 的回复通知
  2. void global_watcher(zhandle_t *zh, int type, int state, const char *path,void *watcherCtx)
  3. {
  4. if(type==ZOO_SESSION_EVENT)//回调消息类型是会话相关的类型
  5. {
  6. if(state==ZOO_CONNECTED_STATE)
  7. {
  8. sem_t*sem=(sem_t*)zoo_get_context(zh);
  9. sem_post(sem);
  10. }
  11. }
  12. }
  13. // zkCli 启动连接zkServer
  14. void ZkClient::Star()
  15. {
  16. std::string host =MprpcApplication::GetInstance().GetConfig().Load("zookeeperip");
  17. std::string port =MprpcApplication::GetInstance().GetConfig().Load("zookeeperport");
  18. //param host comma separated host:port pairs, each corresponding to a zk
  19. //server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
  20. std::string connstr=host+":"+port;
  21. m_zhandle=zookeeper_init(connstr.c_str(),global_watcher,3000,nullptr,nullptr,0);
  22. if(m_zhandle==nullptr)
  23. {
  24. LOG_INFO("zookeeper_init error!");
  25. std::cout<<"zookeeper_init error!"<<std::endl;
  26. exit(EXIT_FAILURE);
  27. }
  28. /* This method creates a new handle and a zookeeper session that correspondsto that handle.
  29. Session establishment is asynchronous:
  30. meaning that the session should not be considered established
  31. until (and unless) anevent of state ZOO_CONNECTED_STATE is received.*/
  32. sem_t sem;
  33. sem_init(&sem,0,0);
  34. zoo_set_context(m_zhandle,&sem);
  35. sem_wait(&sem);
  36. LOG_ERROR("zookeeper_init success!");
  37. }

我们需要调用 zkCli 的接口API 建立与 server 的连接:zookeeper_init

  1. ZOOAPI zhandle_t *zookeeper_init(const char *host, watcher_fn fn,
  2. int recv_timeout, const clientid_t *clientid, void *context, int flags);
  3. /*
  4. const char *host:ZooKeeper 服务器的地址 IP :Port
  5. param host comma separated host:port pairs
  6. server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
  7. */
  8. /*
  9. watcher_fn fn ,回调函数:返回 server 与 Cli 的连接状态信息
  10. */
  11. /*
  12. int recv_timeout:超时时间(默认30000 ms->30s)
  13. */
  14. /*
  15. const clientid_t *clientid: 一个指向 clientid_t 结构的指针包含了客户端的会话 ID 和会话密码
  16. 如果这是客户端的第一次连接(即没有现有的会话),则此参数应该为 NULL。
  17. */
  18. /*
  19. void *context: 一个用户定义的上下文指针,被传递给 watcher_fn 函数
  20. 这允许用户在回调函数中访问特定于应用程序的数据
  21. */
  22. /*
  23. int flags: 初始化连接的标志:这些标志的具体含义取决于 ZooKeeper 客户端库的版本
  24. 连接选项,如是否要启用只读模式等。
  25. */

zookeeper_init 基于zookeeper_mt(多线程),实现异步连接

主线程:执行当前的zookeeper_init内容

pthread_create:

子线程1:网络I/O线程 由于客户端无高并发要求,采用 poll

子线程2:watcher回调线程返回server连接后的状态信息,判断是否成功

主线程:

zookeeper_init该函数返回一个创建的 zkCli 句柄,

若返回值为空,则说明该语句执行失败

返回值不为空,并不能说明连接已经建立,而表示该语句执行成功并创建了m_zhandle 的内存空间

  1. zookeeper_init 在创建完 m_zhandle 的句柄后,设置信号量并初始为0(资源不可用)
  2. 将信号量设置为 m_zhandle 的上下文 (新增加的关联信息)
  3. 在 sem_wait(&sem) 调用时,由于信号量的值为 0 线程将会阻塞,直到在watcher线程中判断连接成功后调用了 sem_post(&sem) 来增加信号量的值被唤醒,至此连接结束

子线程1:网络I/O线程

进行网络的数据收发,发送客户端的连接请求、结点的建立请求,获取server返回cli的watcher的回调信息

子线程2:watcher回调线程

  1. type==ZOO_SESSION_EVENT//回调消息类型是会话相关的类型
  2. state==ZOO_CONNECTED_STATE)//判断连接状态
  3. //连接成功后:
  4. sem_t*sem=(sem_t*)zoo_get_context(zh);//由句柄指针获取句柄的上下文,关联信息sem
  5. sem_post(sem);//增加信号量

1.2. 创建结点并写入数据信息:

  1. // 在zkServer 根据指定路径(/Servicename/functionname : ip+port)创建znode结点
  2. void ZkClient::Create(const char *path, const char *data, int datalen, int state)
  3. {
  4. char path_buf[128];
  5. int path_len=sizeof(path_buf);
  6. int flag;
  7. //判断结点是否已经存在,避免重重复创建
  8. flag=zoo_exists(m_zhandle,path,0,nullptr);
  9. if(ZNONODE==flag)//结点不存在
  10. {
  11. flag=zoo_create(m_zhandle,path,data,datalen,&ZOO_OPEN_ACL_UNSAFE,state,path_buf,path_len);
  12. if(flag==ZOK)
  13. {
  14. LOG_INFO("znode create success...path:%s:",path);
  15. }
  16. else
  17. {
  18. LOG_ERROR("znode create error,path:%s,flag:%d",path,flag);
  19. exit(EXIT_FAILURE);
  20. }
  21. }
  22. }

1.3. 获取结点数据:

  1. // 根据参数指定路径->获取结点值
  2. std::string ZkClient::GetData(const char *path)
  3. {
  4. char buff[64];
  5. int buff_len=sizeof(buff);
  6. //调用接口实现由结点路径、获取该结点的data 写入buff
  7. int flag=zoo_get(m_zhandle,path,0,buff,&buff_len,nullptr);
  8. if(flag!=ZOK)//返回值为ZOK 表示成功!
  9. {
  10. std::cout<<"get znode errr ... path:"<<path<<std::endl;
  11. return "";
  12. }
  13. return buff;
  14. }

构造析构函数:

  1. ZkClient::ZkClient():m_zhandle(nullptr)
  2. {
  3. }
  4. ZkClient::~ZkClient()
  5. {
  6. if(m_zhandle!=nullptr)
  7. zookeeper_close(m_zhandle);
  8. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/933792
推荐阅读
相关标签
  

闽ICP备14008679号