赞
踩
对于redis是否是多线程,这就是证据,该代码基于最新的redis源码说明。
redis启动的时候,当所有模块加载完成后,最终会调用InitServerLast方法,启动后台线程(Background io,bio)。
void InitServerLast() {
bioInit();
initThreadedIO();
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
对于关键变量的说明:
初始化主要是在bioInit。bio_close_file用于关闭文件、bio_aof_fsync用于文件在os buffer缓冲区的刷盘、bio_lazy_free异步删除key。
该方法对每个BIO_NUM_OPS,去调用pthread_create创建一个线程(fork才是进程),并设置了栈的大小,启动完成后,会调用bioProcessBackgroundJobs方法。
/* Initialize the background system, spawning the thread. */ void bioInit(void) { pthread_attr_t attr; pthread_t thread; size_t stacksize; int j; /* Initialization of state vars and objects */ for (j = 0; j < BIO_NUM_OPS; j++) { pthread_mutex_init(&bio_mutex[j],NULL); pthread_cond_init(&bio_newjob_cond[j],NULL); pthread_cond_init(&bio_step_cond[j],NULL); bio_jobs[j] = listCreate(); bio_pending[j] = 0; } /* Set the stack size as by default it may be small in some system */ pthread_attr_init(&attr); pthread_attr_getstacksize(&attr,&stacksize); if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; pthread_attr_setstacksize(&attr, stacksize); /* Ready to spawn our threads. We use the single argument the thread * function accepts in order to pass the job ID the thread is * responsible of. */ for (j = 0; j < BIO_NUM_OPS; j++) { void *arg = (void*)(unsigned long) j; if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs."); exit(1); } bio_threads[j] = thread; } }
主要是在bioProcessBackgroundJobs方法里。主要流程是从任务链表取出,并根据BIO_NUM_OPS,执行不同流程。
详细流程:
void *bioProcessBackgroundJobs(void *arg) { struct bio_job *job; unsigned long type = (unsigned long) arg; sigset_t sigset; /* Check that the type is within the right interval. */ if (type >= BIO_NUM_OPS) { serverLog(LL_WARNING, "Warning: bio thread started with wrong type %lu",type); return NULL; } switch (type) { case BIO_CLOSE_FILE: redis_set_thread_title("bio_close_file"); break; case BIO_AOF_FSYNC: redis_set_thread_title("bio_aof_fsync"); break; case BIO_LAZY_FREE: redis_set_thread_title("bio_lazy_free"); break; } redisSetCpuAffinity(server.bio_cpulist); /* Make the thread killable at any time, so that bioKillThreads() * can work reliably. */ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); pthread_mutex_lock(&bio_mutex[type]); /* Block SIGALRM so we are sure that only the main thread will * receive the watchdog signal. */ sigemptyset(&sigset); sigaddset(&sigset, SIGALRM); if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) serverLog(LL_WARNING, "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno)); while(1) { listNode *ln; /* The loop always starts with the lock hold. */ if (listLength(bio_jobs[type]) == 0) { pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]); continue; } /* Pop the job from the queue. */ ln = listFirst(bio_jobs[type]); job = ln->value; /* It is now possible to unlock the background system as we know have * a stand alone job structure to process.*/ pthread_mutex_unlock(&bio_mutex[type]); /* Process the job accordingly to its type. */ if (type == BIO_CLOSE_FILE) { close((long)job->arg1); } else if (type == BIO_AOF_FSYNC) { redis_fsync((long)job->arg1); } else if (type == BIO_LAZY_FREE) { /* What we free changes depending on what arguments are set: * arg1 -> free the object at pointer. * arg2 & arg3 -> free two dictionaries (a Redis DB). * only arg3 -> free the skiplist. */ if (job->arg1) lazyfreeFreeObjectFromBioThread(job->arg1); else if (job->arg2 && job->arg3) lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3); else if (job->arg3) lazyfreeFreeSlotsMapFromBioThread(job->arg3); } else { serverPanic("Wrong job type in bioProcessBackgroundJobs()."); } zfree(job); /* Lock again before reiterating the loop, if there are no longer * jobs to process we'll block again in pthread_cond_wait(). */ pthread_mutex_lock(&bio_mutex[type]); listDelNode(bio_jobs[type],ln); bio_pending[type]--; /* Unblock threads blocked on bioWaitStepOfType() if any. */ pthread_cond_broadcast(&bio_step_cond[type]); } }
释放对象,什么类型的对象都可以被释放。
void lazyfreeFreeObjectFromBioThread(robj *o) { decrRefCount(o); atomicDecr(lazyfree_objects,1); } void decrRefCount(robj *o) { if (o->refcount == 1) { switch(o->type) { case OBJ_STRING: freeStringObject(o); break; case OBJ_LIST: freeListObject(o); break; case OBJ_SET: freeSetObject(o); break; case OBJ_ZSET: freeZsetObject(o); break; case OBJ_HASH: freeHashObject(o); break; case OBJ_MODULE: freeModuleObject(o); break; case OBJ_STREAM: freeStreamObject(o); break; default: serverPanic("Unknown object type"); break; } zfree(o); } else { if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0"); if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount--; } }
释放字典数据结构,不光要释放字典,也要释放里面的key和val。
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) { size_t numkeys = dictSize(ht1); dictRelease(ht1); dictRelease(ht2); atomicDecr(lazyfree_objects,numkeys); } void dictRelease(dict *d) { _dictClear(d,&d->ht[0],NULL); _dictClear(d,&d->ht[1],NULL); zfree(d); } int _dictClear(dict *d, dictht *ht, void(callback)(void *)) { unsigned long i; /* Free all the elements */ for (i = 0; i < ht->size && ht->used > 0; i++) { dictEntry *he, *nextHe; if (callback && (i & 65535) == 0) callback(d->privdata); if ((he = ht->table[i]) == NULL) continue; while(he) { nextHe = he->next; dictFreeKey(d, he); dictFreeVal(d, he); zfree(he); ht->used--; he = nextHe; } } /* Free the table and the allocated cache structure */ zfree(ht->table); /* Re-initialize the table */ _dictReset(ht); return DICT_OK; /* never fails */ } static void _dictReset(dictht *ht) { ht->table = NULL; ht->size = 0; ht->sizemask = 0; ht->used = 0; }
释放跳表的对象。
void lazyfreeFreeSlotsMapFromBioThread(rax *rt) { size_t len = rt->numele; raxFree(rt); atomicDecr(lazyfree_objects,len); } void raxFree(rax *rax) { raxFreeWithCallback(rax,NULL); } void raxFreeWithCallback(rax *rax, void (*free_callback)(void*)) { raxRecursiveFree(rax,rax->head,free_callback); assert(rax->numnodes == 0); rax_free(rax); } void raxRecursiveFree(rax *rax, raxNode *n, void (*free_callback)(void*)) { debugnode("free traversing",n); int numchildren = n->iscompr ? 1 : n->size; raxNode **cp = raxNodeLastChildPtr(n); while(numchildren--) { raxNode *child; memcpy(&child,cp,sizeof(child)); raxRecursiveFree(rax,child,free_callback); cp--; } debugnode("free depth-first",n); if (free_callback && n->iskey && !n->isnull) free_callback(raxGetData(n)); rax_free(n); rax->numnodes--; }
如果有指定类型的挂起作业,函数将阻塞并等待处理下一个作业。否则,函数不会阻塞并尽快返回。该函数返回仍要处理的请求类型的作业数量。当从另一个线程,我们想等待一个bio.c线程以阻塞的方式做更多的工作时,这个函数是有用的。
unsigned long long bioWaitStepOfType(int type) {
unsigned long long val;
pthread_mutex_lock(&bio_mutex[type]);
val = bio_pending[type];
if (val != 0) {
pthread_cond_wait(&bio_step_cond[type],&bio_mutex[type]);
val = bio_pending[type];
}
pthread_mutex_unlock(&bio_mutex[type]);
return val;
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。