当前位置:   article > 正文

zookeeper快速入门——应用(两种分布式锁)

zookeeper快速入门——应用(两种分布式锁)

        在《zookeeper快速入门——简介》一文中,我们介绍了zookeeper的机制。但是还是比较抽象,没有直观感受到它在分布式系统中的应用。本文我们使用一个例子,三次迭代演进,来说明Zookeeper Client端和Server端如何配合以实现分布式协作。(转载请指明出于breaksoftware的csdn博客)

        为了例子足够简单明确,我们以实现“分布式锁”为例。所谓分布式锁,就是在一个分布式系统中,各个子系统可以共享的同一把“锁”。这样大家可以在这把锁的协调下,进行协作。

        我们可以尝试在Zookeeper Server的节点树上创建一个特定名称的节点。如果创建成功了,则认为获取到了锁。Client可以执行相应业务逻辑,然后通知Server删除该节点以释放锁。其他Client可能在此时正好去创建该节点,并成功了,那么它就获得了锁。其他创建失败的Client则被认为没有获得锁,则继续等待和尝试。

        可能此时你已经意识到一个问题:如果某个获得锁的Client和Server断开了连接,而没有机会通知Server删除test_lock文件。那就导致整个系统处于“死锁”状态。

        不用担心,zookeeper设计了“临时”节点的概念。“临时”节点由Client向Server端请求创建,一旦Client和Server连接断开,这个Client创建的“临时”节点将被删除。这样我们就不用担心因为连接断开而导致的问题了。和普通节点一样,“临时”节点也可以被Client主动删除。

        基本思路理清楚后,我们开始着手编写这块逻辑。为了简单,我们在一个进程内部使用多线程技术模拟分布在不同机器上的Client端。

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <unistd.h>
  5. #include <pthread.h>
  6. #include <zookeeper.h>
  7. #include <zookeeper_log.h>

        第一步我们要使用zookeeper_init方法去连接Zookeeper Server。该函数原型如下

  1. ZOOAPI zhandle_t* zookeeper_init (
  2. const char * host,
  3. watcher_fn watcher,
  4. int recv_timeout,
  5. const clientid_t * clientid,
  6. void * context,
  7. int flags
  8. )

        该方法创建了一个zhandle_t指针和一个与之绑定的连接session,之后我们将在一直要使用这个指针和Server进行通信。

     但是这个函数有个陷阱:即使返回了一个可用指针,可是与之绑定的session此时不一定可用。我们需要等到ZOO_CONNECTED_STATE消息到来才能确认。此时我们就要借助zookeeper中无处不在的监视功能(watcher)。

         zookeeper_init方法第二个参数传递的是一个回调函数地址——watcher,第五个参数传递的是这个回调函数可以使用的上下文信息——context。

        为了让回调函数可以通知工作线程session已经可用,我们可以把上下文信息设置为一个包含条件变量的结构watchctx_t

  1. typedef struct watchctx_t {
  2. pthread_cond_t cond;
  3. pthread_mutex_t cond_lock;
  4. } watchctx_t;

        这样在回调函数中,如果我们收到ZOO_CONNECTED_STATE通知,就触发条件变量

  1. void main_watcher(zhandle_t* zh, int type, int state,
  2. const char* path, void* watcherCtx)
  3. {
  4. if (type == ZOO_SESSION_EVENT) {
  5. watchctx_t *ctx = (watchctx_t*)watcherCtx;
  6. if (state == ZOO_CONNECTED_STATE) {
  7. pthread_cond_signal(&ctx->cond);
  8. }
  9. }
  10. }

        在调用zookeeper_init方法后,工作线程一直等待条件变量,如果超过设置的超时时间,就认为连接失败

  1. int init_watchctx(watchctx_t* ctx) {
  2. if (0 != pthread_cond_init(&ctx->cond, NULL)) {
  3. fprintf(stderr, "condition init error\n");
  4. return -1;
  5. }
  6. if (0 != pthread_mutex_init(&ctx->cond_lock, NULL)) {
  7. fprintf(stderr, "mutex init error\n");
  8. pthread_cond_destroy(&ctx->cond);
  9. return -2;
  10. }
  11. return 0;
  12. }
  1. zhandle_t* init() {
  2. const char* host = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
  3. int timeout = 30000;
  4. zhandle_t* zh = NULL;
  5.     watchctx_t ctx;
  6.     if (0 != init_watchctx(&ctx)) {
  7.         return zh;
  8.     }
  9. zh = zookeeper_init(host, main_watcher, timeout, 0, &ctx, 0);
  10. if (zh == NULL) {
  11. fprintf(stderr, "Error when connecting to zookeeper servers...\n");
  12. pthread_cond_destroy(&ctx.cond);
  13. pthread_mutex_destroy(&ctx.cond_lock);
  14. return zh;
  15. }
  16. struct timeval now;
  17. struct timespec outtime;
  18. gettimeofday(&now, NULL);
  19. outtime.tv_sec = now.tv_sec + 1;
  20. outtime.tv_nsec = now.tv_usec * 1000;
  21. pthread_mutex_lock(&ctx.cond_lock);
  22. int wait_result = pthread_cond_timedwait(&ctx.cond, &ctx.cond_lock, &outtime);
  23. pthread_mutex_unlock(&ctx.cond_lock);
  24. pthread_cond_destroy(&ctx.cond);
  25. pthread_mutex_destroy(&ctx.cond_lock);
  26. if (0 != wait_result) {
  27. fprintf(stderr, "Connecting to zookeeper servers timeout...\n");
  28. zookeeper_close(zh);
  29. zh = NULL;
  30. return zh;
  31. }
  32. return zh;
  33. }

        解决了连接问题,后面的逻辑就简单了。我们使用zoo_create方法创建一个路径为/test_lock的临时节点,然后通过返回结果判断是否获得锁

  1. void thread_routine(void* ptr) {
  2. zhandle_t* zh = init();
  3. if (!zh) {
  4. return;
  5. }
  6. const char* lock_data = "lock";
  7. const char* lock_path = "/test_lock";
  8. int ret = ZNODEEXISTS;
  9. do {
  10. ret = zoo_create(zh, lock_path, lock_data, strlen(lock_data),
  11. &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
  12. if (ZNODEEXISTS == ret) {
  13. //fprintf(stderr, "lock exist\n");
  14. continue;
  15. }
  16. else if (ZOK == ret) {
  17. pthread_t pid = pthread_self();
  18. fprintf(stdout, "%lu get lock\n", (long long)pid);
  19. zoo_delete(zh, lock_path, -1);
  20. sleep(1);
  21. }
  22. else {
  23. fprintf(stderr, "Error %d for %s\n", ret, "create");
  24. break;
  25. }
  26. } while (1);
  27. zookeeper_close(zh);
  28. }

        上述代码19行开始的逻辑表示这个线程获取了锁,它只是简单的打印出get lock,然后调用zoo_delete删除节点——释放锁。

        这个函数使用一个while死循环来控制业务进行,这种不停调用zoo_create去检测是否获得锁的方法非常浪费资源。那我们如何对这个函数进行改造?

        如果我们可以基于事件驱动监控/test_lock节点状态就好了。zookeeper也提供了这种方式——还是watcher。

  1. void thread_routine(void* ptr) {
  2. zhandle_t* zh = init();
  3. if (!zh) {
  4. return;
  5. }
  6. watchctx_t ctx;
  7. if (0 != init_watchctx(&ctx)) {
  8. return;
  9. }
  10. const char* lock_data = "lock";
  11. const char* lock_path = "/test_lock";
  12. int ret = ZNODEEXISTS;
  13. do {
  14. struct Stat stat;
  15. int cur_st = zoo_wexists(zh, lock_path, lock_watcher, &ctx, &stat);
  16. if (ZOK == cur_st) {
  17. //fprintf(stdout, "wait\n");
  18. pthread_mutex_lock(&ctx.cond_lock);
  19. pthread_cond_wait(&ctx.cond, &ctx.cond_lock);
  20. pthread_mutex_unlock(&ctx.cond_lock);
  21. }
  22. ret = zoo_create(zh, lock_path, lock_data, strlen(lock_data),
  23. &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
  24. if (ZNODEEXISTS == ret) {
  25. //fprintf(stderr, "lock exist\n");
  26. }
  27. else if (ZOK == ret) {
  28. pthread_t pid = pthread_self();
  29. fprintf(stdout, "%lu get lock\n", (long long)pid);
  30. zoo_delete(zh, lock_path, -1);
  31. }
  32. else {
  33. fprintf(stderr, "Error %d for %s\n", ret, "create");
  34. }
  35. } while (1);
  36. zookeeper_close(zh);
  37. pthread_cond_destroy(&ctx.cond);
  38. pthread_mutex_destroy(&ctx.cond_lock);
  39. }

        第18行,我们调用zoo_wexists方法监控节点状态。如果监控点设置成功,则等待上文中创建的条件变量。该条件变量在zoo_wexists参数的回调函数中被设置

  1. void lock_watcher(zhandle_t* zh, int type, int state,
  2. const char* path, void* watcherCtx)
  3. {
  4. //sleep(1);
  5. //fprintf(stdout, "lock_watcher: %s %d, %d\n", path, type, state);
  6. if (type == ZOO_DELETED_EVENT) {
  7. //fprintf(stdout, "delete %s\n", path);
  8. watchctx_t* ctx = (watchctx_t*)watcherCtx;
  9. pthread_cond_signal(&ctx->cond);
  10. }
  11. else {
  12. //fprintf(stdout, "add %s\n", path);
  13. struct Stat stat;
  14. zoo_wexists(zh, path, lock_watcher, watcherCtx, &stat);
  15. }
  16. }

        zookeeper的监控点是一次性的,即如果一次被触发则不再触发。于是在这个回调函数中,如果我们发现节点不是被删除——监控到它被其他Client创建,就再次注册该监控点。

        这样我们就使用了相对高大上的事件通知机制。但是问题随之而来,这种方式会引起惊群现象。即在一个Client释放锁后,其他Client都会尝试去调用zoo_create去获取锁,这会造成系统抖动很强烈。

        我们继续改进锁的设计。现在我们换个思路,让这些Client排着队去尝试获取锁。如果做呢?

        每个Client在Server上按顺序创建一个节点,并监控比自己小的那个节点。如果比自己小的那个节点(最接近自己的)被删除了,则意味着:

  1. 可能排在“我”前面的Client和Server断开了连接,那么此时应该还没轮到“我”,于是“我”要找到此时比“我”小的、最邻近的节点路径,然后去监控这个节点。
  2. 可能排在“我”前面的所有Client都获得过锁了,并且它们都释放了,现在轮到“我”来获得锁了。

        采用这种方式,我们可以最大限度的减少获取锁的行为。但是这对zookeeper提出了一个要求,我们可以原子性的创建包含单调递增数字的路径的节点。非常幸运的是,zookeeper的确提供了这样的方式——顺序节点。

  1. void thread_routine(void* ptr) {
  2. zhandle_t* zh = init();
  3. if (!zh) {
  4. return;
  5. }
  6. watchctx_t ctx;
  7. if (0 != init_watchctx(&ctx)) {
  8. return;
  9. }
  10. #define ROOT_PATH "/test_seq_lock"
  11. const char* root_path = ROOT_PATH;
  12. const char* lock_data = "lock";
  13. const char* lock_path = ROOT_PATH"/0";
  14. int ret = ZNODEEXISTS;
  15. do {
  16. const int seq_path_lenght = 512;
  17. char sequence_path[seq_path_lenght];
  18. ret = zoo_create(zh, lock_path, lock_data, strlen(lock_data),
  19. &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL | ZOO_SEQUENCE,
  20. sequence_path, sizeof(sequence_path) - 1);
  21. if (ZNODEEXISTS == ret) {
  22. //fprintf(stderr, "lock exist\n");
  23. }
  24. else if (ZOK == ret) {
  25. ret = wait_for_lock(zh, &ctx, root_path, sequence_path);
  26. if (ZOK == ret) {
  27. pthread_t pid = pthread_self();
  28. fprintf(stdout, "%lu %s get lock\n", (long long)pid, sequence_path);
  29. sleep(0.1);
  30. }
  31. zoo_delete(zh, sequence_path, -1);
  32. }
  33. else if (ZNONODE == ret) {
  34. ret = zoo_create(zh, root_path, lock_data, strlen(lock_data),
  35. &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
  36. if (ZNODEEXISTS != ret && ZOK != ret) {
  37. fprintf(stderr, "Error %d for %s\n", ret, "create root path");
  38. break;
  39. }
  40. }
  41. else {
  42. fprintf(stderr, "Error %d for %s\n", ret, "create");
  43. }
  44. } while (1);
  45. zookeeper_close(zh);
  46. pthread_cond_destroy(&ctx.cond);
  47. pthread_mutex_destroy(&ctx.cond_lock);
  48. }

        第23行,我们给zoo_create方法传入了一个路径空间用于接收创建的有序节点路径。第28行,我们将这个路径连同条件变量一起传入自定义函数wait_for_lock去等待获得锁的时机。

  1. int search_watch_neighbor(zhandle_t* zh, const char* root_path,
  2. const char* cur_name,
  3. char* neighbor_name, int len)
  4. {
  5. struct String_vector strings;
  6. int rc = zoo_get_children(zh, root_path, 0, &strings);
  7. if (ZOK != rc || 0 == strings.count) {
  8. return ZNOTEMPTY;
  9. }
  10. int neighbor = -1;
  11. for (int i = 0; i < strings.count; i++) {
  12. int cmp = strcmp(cur_name, strings.data[i]);
  13. if (cmp <= 0) {
  14. continue;
  15. }
  16. if (-1 == neighbor) {
  17. neighbor = i;
  18. continue;
  19. }
  20. cmp = strcmp(strings.data[neighbor], strings.data[i]);
  21. if (cmp >= 0) {
  22. continue;
  23. }
  24. neighbor = i;
  25. }
  26. if (-1 == neighbor) {
  27. *neighbor_name = 0;
  28. return ZNONODE;
  29. }
  30. int neighbor_name_len = strlen(strings.data[neighbor]);
  31. if (len < neighbor_name_len - 1) {
  32. *neighbor_name = 0;
  33. return ZBADARGUMENTS;
  34. }
  35. memcpy(neighbor_name, strings.data[neighbor], neighbor_name_len);
  36. *(neighbor_name + neighbor_name_len) = '\0';
  37. fprintf(stdout, "********\n self: %s neighbor:%s\n*********\n", cur_name, neighbor_name);
  38. return ZOK;
  39. }
  40. void neighbor_watcher(zhandle_t* zh, int type, int state,
  41. const char* path, void* watcherCtx)
  42. {
  43. if (type == ZOO_DELETED_EVENT) {
  44. watchctx_t* ctx = (watchctx_t*)watcherCtx;
  45. const int path_len_max = 512;
  46. char neighbor_name[path_len_max];
  47. int ret = search_watch_neighbor(zh, ctx->root_path,
  48. ctx->cur_name, neighbor_name, sizeof(neighbor_name));
  49. if (ZNONODE == ret) {
  50. pthread_cond_signal(&ctx->cond);
  51. }
  52. else if (ZOK == ret) {
  53. char neighbor_path[path_len_max];
  54. sprintf(neighbor_path, "%s/%s", ctx->root_path, neighbor_name);
  55. struct Stat stat;
  56. zoo_wexists(zh, neighbor_path, neighbor_watcher, watcherCtx, &stat);
  57. }
  58. }
  59. }
  60. int wait_for_lock(zhandle_t* zh, watchctx_t* ctx, const char* root_path, const char* sequence_path) {
  61. strcpy(ctx->root_path, root_path);
  62. strcpy(ctx->cur_name, sequence_path + strlen(root_path) + 1);
  63. const int path_len_max = 512;
  64. char neighbor_name[path_len_max];
  65. int status = ZOK;
  66. do {
  67. int ret = search_watch_neighbor(zh, ctx->root_path,
  68. ctx->cur_name, neighbor_name, sizeof(neighbor_name));
  69. char neighbor_path[path_len_max];
  70. sprintf(neighbor_path, "%s/%s", root_path, neighbor_name);
  71. pthread_t pid = pthread_self();
  72. fprintf(stdout, "%lu get neighbor info: %d %s\n", (long long)pid, ret, neighbor_path);
  73. if (ZNONODE == ret) {
  74. status = ZOK;
  75. break;
  76. }
  77. else if (ZOK == ret) {
  78. struct Stat stat;
  79. if (ZOK == zoo_wexists(zh, neighbor_path, neighbor_watcher, ctx, &stat)) {
  80. pthread_mutex_lock(&ctx->cond_lock);
  81. pthread_cond_wait(&ctx->cond, &ctx->cond_lock);
  82. pthread_mutex_unlock(&ctx->cond_lock);
  83. }
  84. else {
  85. continue;
  86. }
  87. }
  88. else {
  89. status = ZSYSTEMERROR;
  90. break;
  91. }
  92. } while(1);
  93. return status;
  94. }

        再结合main函数的实现,两种不方式设计的分布式锁都可以运行起来

  1. #define countof(x) sizeof(x)/sizeof(x[0])
  2. int main(int argc, const char *argv[]) {
  3. const int thread_num = 3;
  4. pthread_t ids[thread_num];
  5. for (int i = 0; i < countof(ids); i++) {
  6. pthread_create(&ids[i], NULL, (void*)thread_routine, NULL);
  7. }
  8. for (int i = 0; i < countof(ids); i++) {
  9. pthread_join(ids[i], NULL);
  10. }
  11. return 0;
  12. }

        将上述文件保存为lock_test.c,然后调用下面的指令编译

gcc -o lock_test lock_test.c -I/home/work/fangliang/zookeeper-3.4.11/src/c/generated -I/home/work/fangliang/zookeeper-3.4.11/src/c/include -L/home/work/fangliang/zookeeper-3.4.11/src/c/.libs -lzookeeper_mt -DTHREADED -std=c99

        关于zookeeper库的编译,网上有很多。我编译起来还算顺利,只是在找不到so时候使用下面指令指定下查找路径

export LD_LIBRARY_PATH=/home/work/fangliang/zookeeper-3.4.11/src/c/.libs:$LD_LIBRARY_PATH

        参考资料

  • https://www.cnblogs.com/xybaby/p/6871764.html
  • http://lib.csdn.net/article/hadoop/6665
  • https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/
  • http://www.cnblogs.com/haippy/archive/2013/02/21/2920280.html
  • http://zookeeper.sourcearchive.com/documentation/3.2.2plus-pdfsg3/zookeeper_8h.html
  • 《Zookeeper分布式过程协同技术详解》
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/710681
推荐阅读
相关标签
  

闽ICP备14008679号