赞
踩
在《zookeeper快速入门——简介》一文中,我们介绍了zookeeper的机制。但是还是比较抽象,没有直观感受到它在分布式系统中的应用。本文我们使用一个例子,三次迭代演进,来说明Zookeeper Client端和Server端如何配合以实现分布式协作。(转载请指明出于breaksoftware的csdn博客)
为了例子足够简单明确,我们以实现“分布式锁”为例。所谓分布式锁,就是在一个分布式系统中,各个子系统可以共享的同一把“锁”。这样大家可以在这把锁的协调下,进行协作。
我们可以尝试在Zookeeper Server的节点树上创建一个特定名称的节点。如果创建成功了,则认为获取到了锁。Client可以执行相应业务逻辑,然后通知Server删除该节点以释放锁。其他Client可能在此时正好去创建该节点,并成功了,那么它就获得了锁。其他创建失败的Client则被认为没有获得锁,则继续等待和尝试。
可能此时你已经意识到一个问题:如果某个获得锁的Client和Server断开了连接,而没有机会通知Server删除test_lock文件。那就导致整个系统处于“死锁”状态。
不用担心,zookeeper设计了“临时”节点的概念。“临时”节点由Client向Server端请求创建,一旦Client和Server连接断开,这个Client创建的“临时”节点将被删除。这样我们就不用担心因为连接断开而导致的问题了。和普通节点一样,“临时”节点也可以被Client主动删除。
基本思路理清楚后,我们开始着手编写这块逻辑。为了简单,我们在一个进程内部使用多线程技术模拟分布在不同机器上的Client端。
- #include <stdio.h>
- #include <stdlib.h>
- #include <string.h>
- #include <unistd.h>
- #include <pthread.h>
- #include <zookeeper.h>
- #include <zookeeper_log.h>
第一步我们要使用zookeeper_init方法去连接Zookeeper Server。该函数原型如下
- ZOOAPI zhandle_t* zookeeper_init (
- const char * host,
- watcher_fn watcher,
- int recv_timeout,
- const clientid_t * clientid,
- void * context,
- int flags
- )
该方法创建了一个zhandle_t指针和一个与之绑定的连接session,之后我们将在一直要使用这个指针和Server进行通信。
但是这个函数有个陷阱:即使返回了一个可用指针,可是与之绑定的session此时不一定可用。我们需要等到ZOO_CONNECTED_STATE消息到来才能确认。此时我们就要借助zookeeper中无处不在的监视功能(watcher)。
zookeeper_init方法第二个参数传递的是一个回调函数地址——watcher,第五个参数传递的是这个回调函数可以使用的上下文信息——context。
为了让回调函数可以通知工作线程session已经可用,我们可以把上下文信息设置为一个包含条件变量的结构watchctx_t
- typedef struct watchctx_t {
- pthread_cond_t cond;
- pthread_mutex_t cond_lock;
- } watchctx_t;
这样在回调函数中,如果我们收到ZOO_CONNECTED_STATE通知,就触发条件变量
- void main_watcher(zhandle_t* zh, int type, int state,
- const char* path, void* watcherCtx)
- {
- if (type == ZOO_SESSION_EVENT) {
- watchctx_t *ctx = (watchctx_t*)watcherCtx;
- if (state == ZOO_CONNECTED_STATE) {
- pthread_cond_signal(&ctx->cond);
- }
- }
- }
在调用zookeeper_init方法后,工作线程一直等待条件变量,如果超过设置的超时时间,就认为连接失败
- int init_watchctx(watchctx_t* ctx) {
- if (0 != pthread_cond_init(&ctx->cond, NULL)) {
- fprintf(stderr, "condition init error\n");
- return -1;
- }
-
- if (0 != pthread_mutex_init(&ctx->cond_lock, NULL)) {
- fprintf(stderr, "mutex init error\n");
- pthread_cond_destroy(&ctx->cond);
- return -2;
- }
-
- return 0;
- }
- zhandle_t* init() {
- const char* host = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
- int timeout = 30000;
- zhandle_t* zh = NULL;
-
- watchctx_t ctx;
- if (0 != init_watchctx(&ctx)) {
- return zh;
- }
-
- zh = zookeeper_init(host, main_watcher, timeout, 0, &ctx, 0);
- if (zh == NULL) {
- fprintf(stderr, "Error when connecting to zookeeper servers...\n");
- pthread_cond_destroy(&ctx.cond);
- pthread_mutex_destroy(&ctx.cond_lock);
- return zh;
- }
-
- struct timeval now;
- struct timespec outtime;
- gettimeofday(&now, NULL);
- outtime.tv_sec = now.tv_sec + 1;
- outtime.tv_nsec = now.tv_usec * 1000;
-
- pthread_mutex_lock(&ctx.cond_lock);
- int wait_result = pthread_cond_timedwait(&ctx.cond, &ctx.cond_lock, &outtime);
- pthread_mutex_unlock(&ctx.cond_lock);
-
- pthread_cond_destroy(&ctx.cond);
- pthread_mutex_destroy(&ctx.cond_lock);
-
- if (0 != wait_result) {
- fprintf(stderr, "Connecting to zookeeper servers timeout...\n");
- zookeeper_close(zh);
- zh = NULL;
- return zh;
- }
-
- return zh;
- }

解决了连接问题,后面的逻辑就简单了。我们使用zoo_create方法创建一个路径为/test_lock的临时节点,然后通过返回结果判断是否获得锁
- void thread_routine(void* ptr) {
-
- zhandle_t* zh = init();
- if (!zh) {
- return;
- }
-
- const char* lock_data = "lock";
- const char* lock_path = "/test_lock";
- int ret = ZNODEEXISTS;
- do {
- ret = zoo_create(zh, lock_path, lock_data, strlen(lock_data),
- &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
- if (ZNODEEXISTS == ret) {
- //fprintf(stderr, "lock exist\n");
- continue;
- }
- else if (ZOK == ret) {
- pthread_t pid = pthread_self();
- fprintf(stdout, "%lu get lock\n", (long long)pid);
- zoo_delete(zh, lock_path, -1);
- sleep(1);
- }
- else {
- fprintf(stderr, "Error %d for %s\n", ret, "create");
- break;
- }
- } while (1);
-
- zookeeper_close(zh);
- }

上述代码19行开始的逻辑表示这个线程获取了锁,它只是简单的打印出get lock,然后调用zoo_delete删除节点——释放锁。
这个函数使用一个while死循环来控制业务进行,这种不停调用zoo_create去检测是否获得锁的方法非常浪费资源。那我们如何对这个函数进行改造?
如果我们可以基于事件驱动监控/test_lock节点状态就好了。zookeeper也提供了这种方式——还是watcher。
- void thread_routine(void* ptr) {
-
- zhandle_t* zh = init();
- if (!zh) {
- return;
- }
-
- watchctx_t ctx;
- if (0 != init_watchctx(&ctx)) {
- return;
- }
-
- const char* lock_data = "lock";
- const char* lock_path = "/test_lock";
- int ret = ZNODEEXISTS;
- do {
- struct Stat stat;
- int cur_st = zoo_wexists(zh, lock_path, lock_watcher, &ctx, &stat);
- if (ZOK == cur_st) {
- //fprintf(stdout, "wait\n");
- pthread_mutex_lock(&ctx.cond_lock);
- pthread_cond_wait(&ctx.cond, &ctx.cond_lock);
- pthread_mutex_unlock(&ctx.cond_lock);
- }
-
- ret = zoo_create(zh, lock_path, lock_data, strlen(lock_data),
- &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL, NULL, 0);
- if (ZNODEEXISTS == ret) {
- //fprintf(stderr, "lock exist\n");
- }
- else if (ZOK == ret) {
- pthread_t pid = pthread_self();
- fprintf(stdout, "%lu get lock\n", (long long)pid);
- zoo_delete(zh, lock_path, -1);
- }
- else {
- fprintf(stderr, "Error %d for %s\n", ret, "create");
- }
-
- } while (1);
-
- zookeeper_close(zh);
- pthread_cond_destroy(&ctx.cond);
- pthread_mutex_destroy(&ctx.cond_lock);
- }

第18行,我们调用zoo_wexists方法监控节点状态。如果监控点设置成功,则等待上文中创建的条件变量。该条件变量在zoo_wexists参数的回调函数中被设置
- void lock_watcher(zhandle_t* zh, int type, int state,
- const char* path, void* watcherCtx)
- {
- //sleep(1);
- //fprintf(stdout, "lock_watcher: %s %d, %d\n", path, type, state);
- if (type == ZOO_DELETED_EVENT) {
- //fprintf(stdout, "delete %s\n", path);
- watchctx_t* ctx = (watchctx_t*)watcherCtx;
- pthread_cond_signal(&ctx->cond);
- }
- else {
- //fprintf(stdout, "add %s\n", path);
- struct Stat stat;
- zoo_wexists(zh, path, lock_watcher, watcherCtx, &stat);
- }
- }

zookeeper的监控点是一次性的,即如果一次被触发则不再触发。于是在这个回调函数中,如果我们发现节点不是被删除——监控到它被其他Client创建,就再次注册该监控点。
这样我们就使用了相对高大上的事件通知机制。但是问题随之而来,这种方式会引起惊群现象。即在一个Client释放锁后,其他Client都会尝试去调用zoo_create去获取锁,这会造成系统抖动很强烈。
我们继续改进锁的设计。现在我们换个思路,让这些Client排着队去尝试获取锁。如果做呢?
每个Client在Server上按顺序创建一个节点,并监控比自己小的那个节点。如果比自己小的那个节点(最接近自己的)被删除了,则意味着:
采用这种方式,我们可以最大限度的减少获取锁的行为。但是这对zookeeper提出了一个要求,我们可以原子性的创建包含单调递增数字的路径的节点。非常幸运的是,zookeeper的确提供了这样的方式——顺序节点。
- void thread_routine(void* ptr) {
-
- zhandle_t* zh = init();
- if (!zh) {
- return;
- }
-
- watchctx_t ctx;
- if (0 != init_watchctx(&ctx)) {
- return;
- }
-
- #define ROOT_PATH "/test_seq_lock"
- const char* root_path = ROOT_PATH;
- const char* lock_data = "lock";
- const char* lock_path = ROOT_PATH"/0";
- int ret = ZNODEEXISTS;
- do {
- const int seq_path_lenght = 512;
- char sequence_path[seq_path_lenght];
- ret = zoo_create(zh, lock_path, lock_data, strlen(lock_data),
- &ZOO_OPEN_ACL_UNSAFE, ZOO_EPHEMERAL | ZOO_SEQUENCE,
- sequence_path, sizeof(sequence_path) - 1);
- if (ZNODEEXISTS == ret) {
- //fprintf(stderr, "lock exist\n");
- }
- else if (ZOK == ret) {
- ret = wait_for_lock(zh, &ctx, root_path, sequence_path);
- if (ZOK == ret) {
- pthread_t pid = pthread_self();
- fprintf(stdout, "%lu %s get lock\n", (long long)pid, sequence_path);
- sleep(0.1);
- }
- zoo_delete(zh, sequence_path, -1);
- }
- else if (ZNONODE == ret) {
- ret = zoo_create(zh, root_path, lock_data, strlen(lock_data),
- &ZOO_OPEN_ACL_UNSAFE, 0, NULL, 0);
- if (ZNODEEXISTS != ret && ZOK != ret) {
- fprintf(stderr, "Error %d for %s\n", ret, "create root path");
- break;
- }
- }
- else {
- fprintf(stderr, "Error %d for %s\n", ret, "create");
- }
-
- } while (1);
-
- zookeeper_close(zh);
- pthread_cond_destroy(&ctx.cond);
- pthread_mutex_destroy(&ctx.cond_lock);
- }

第23行,我们给zoo_create方法传入了一个路径空间用于接收创建的有序节点路径。第28行,我们将这个路径连同条件变量一起传入自定义函数wait_for_lock去等待获得锁的时机。
- int search_watch_neighbor(zhandle_t* zh, const char* root_path,
- const char* cur_name,
- char* neighbor_name, int len)
- {
- struct String_vector strings;
- int rc = zoo_get_children(zh, root_path, 0, &strings);
- if (ZOK != rc || 0 == strings.count) {
- return ZNOTEMPTY;
- }
-
- int neighbor = -1;
- for (int i = 0; i < strings.count; i++) {
- int cmp = strcmp(cur_name, strings.data[i]);
- if (cmp <= 0) {
- continue;
- }
-
- if (-1 == neighbor) {
- neighbor = i;
- continue;
- }
-
- cmp = strcmp(strings.data[neighbor], strings.data[i]);
- if (cmp >= 0) {
- continue;
- }
- neighbor = i;
- }
-
- if (-1 == neighbor) {
- *neighbor_name = 0;
- return ZNONODE;
- }
-
- int neighbor_name_len = strlen(strings.data[neighbor]);
- if (len < neighbor_name_len - 1) {
- *neighbor_name = 0;
- return ZBADARGUMENTS;
- }
-
- memcpy(neighbor_name, strings.data[neighbor], neighbor_name_len);
- *(neighbor_name + neighbor_name_len) = '\0';
- fprintf(stdout, "********\n self: %s neighbor:%s\n*********\n", cur_name, neighbor_name);
- return ZOK;
- }
-
- void neighbor_watcher(zhandle_t* zh, int type, int state,
- const char* path, void* watcherCtx)
- {
- if (type == ZOO_DELETED_EVENT) {
- watchctx_t* ctx = (watchctx_t*)watcherCtx;
- const int path_len_max = 512;
- char neighbor_name[path_len_max];
- int ret = search_watch_neighbor(zh, ctx->root_path,
- ctx->cur_name, neighbor_name, sizeof(neighbor_name));
- if (ZNONODE == ret) {
- pthread_cond_signal(&ctx->cond);
- }
- else if (ZOK == ret) {
- char neighbor_path[path_len_max];
- sprintf(neighbor_path, "%s/%s", ctx->root_path, neighbor_name);
- struct Stat stat;
- zoo_wexists(zh, neighbor_path, neighbor_watcher, watcherCtx, &stat);
- }
- }
- }
-
- int wait_for_lock(zhandle_t* zh, watchctx_t* ctx, const char* root_path, const char* sequence_path) {
- strcpy(ctx->root_path, root_path);
- strcpy(ctx->cur_name, sequence_path + strlen(root_path) + 1);
-
- const int path_len_max = 512;
- char neighbor_name[path_len_max];
- int status = ZOK;
-
- do {
- int ret = search_watch_neighbor(zh, ctx->root_path,
- ctx->cur_name, neighbor_name, sizeof(neighbor_name));
-
- char neighbor_path[path_len_max];
- sprintf(neighbor_path, "%s/%s", root_path, neighbor_name);
-
- pthread_t pid = pthread_self();
- fprintf(stdout, "%lu get neighbor info: %d %s\n", (long long)pid, ret, neighbor_path);
-
- if (ZNONODE == ret) {
- status = ZOK;
- break;
- }
- else if (ZOK == ret) {
- struct Stat stat;
- if (ZOK == zoo_wexists(zh, neighbor_path, neighbor_watcher, ctx, &stat)) {
- pthread_mutex_lock(&ctx->cond_lock);
- pthread_cond_wait(&ctx->cond, &ctx->cond_lock);
- pthread_mutex_unlock(&ctx->cond_lock);
- }
- else {
- continue;
- }
- }
- else {
- status = ZSYSTEMERROR;
- break;
- }
-
- } while(1);
- return status;
- }

再结合main函数的实现,两种不方式设计的分布式锁都可以运行起来
- #define countof(x) sizeof(x)/sizeof(x[0])
-
- int main(int argc, const char *argv[]) {
- const int thread_num = 3;
- pthread_t ids[thread_num];
-
- for (int i = 0; i < countof(ids); i++) {
- pthread_create(&ids[i], NULL, (void*)thread_routine, NULL);
- }
-
- for (int i = 0; i < countof(ids); i++) {
- pthread_join(ids[i], NULL);
- }
-
- return 0;
- }

将上述文件保存为lock_test.c,然后调用下面的指令编译
gcc -o lock_test lock_test.c -I/home/work/fangliang/zookeeper-3.4.11/src/c/generated -I/home/work/fangliang/zookeeper-3.4.11/src/c/include -L/home/work/fangliang/zookeeper-3.4.11/src/c/.libs -lzookeeper_mt -DTHREADED -std=c99
关于zookeeper库的编译,网上有很多。我编译起来还算顺利,只是在找不到so时候使用下面指令指定下查找路径
export LD_LIBRARY_PATH=/home/work/fangliang/zookeeper-3.4.11/src/c/.libs:$LD_LIBRARY_PATH
参考资料
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。