赞
踩
bio部分是redis中负责后台进行文件关闭,aof文件缓冲区同步到磁盘,清理对象三种任务的部分。
-
- /* Background job opcodes */
- #define BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */
- #define BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */
- #define BIO_LAZY_FREE 2 /* Deferred objects freeing. */
- #define BIO_NUM_OPS 3
其中0,1,2代表三种工作内容,3则为bio中管理的后台任务类型数量,也就是前三者的数量。
- static pthread_t bio_threads[BIO_NUM_OPS];
- static pthread_mutex_t bio_mutex[BIO_NUM_OPS];
- static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];
- static pthread_cond_t bio_step_cond[BIO_NUM_OPS];
- static list *bio_jobs[BIO_NUM_OPS];
bio_threads为存放工作线程的数组,数组大小为后台任务类型数量。
bio_mutex对应工作线程所需要的锁。
bio_newjob_cond为当任务队列为空,工作线程等待时等待被唤醒的cond。
bio_step_cond则为其他线程等待某任务执行时,监听的cond以试图被唤醒。
biojobs则为任务队列,是链表,用来存放相应类型准备执行的任务。
- for (j = 0; j < BIO_NUM_OPS; j++) {
- void *arg = (void*)(unsigned long) j;
- if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
- serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
- exit(1);
- }
- bio_threads[j] = thread;
- }
在bio的初试化方法bioInit()方法中,会依次建立三个执行bioProcessBackgroudJobs()方法的工作线程,存放在bio_threads中。
- void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
- struct bio_job *job = zmalloc(sizeof(*job));
-
- job->time = time(NULL);
- job->arg1 = arg1;
- job->arg2 = arg2;
- job->arg3 = arg3;
- pthread_mutex_lock(&bio_mutex[type]);
- listAddNodeTail(bio_jobs[type],job);
- bio_pending[type]++;
- pthread_cond_signal(&bio_newjob_cond[type]);
- pthread_mutex_unlock(&bio_mutex[type]);
- }
当需要执行相应类型的后台任务的时候,需要通过bioCreateBackgroudJob()方法,首先,申请一个bio_job的空间,设置相应参数,然后根据任务类型加锁,将任务加入至任务队列,增加统计对应任务类型执行数量bio_pending,接下来唤醒处于等待bio_newjob_cond执行新任务的工作线程,以便执行相应的任务,最后解锁。
可以看到工作线程的真正执行逻辑的地方,bioProcessBackgrounJobs()。
- if (type >= BIO_NUM_OPS) {
- serverLog(LL_WARNING,
- "Warning: bio thread started with wrong type %lu",type);
- return NULL;
- }
-
- /* Make the thread killable at any time, so that bioKillThreads()
- * can work reliably. */
- pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
- pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
在其开始,如果类型为不支持的任务类型,会直接结束。并将该线程设置为可随时终止的线程。
- while(1) {
- listNode *ln;
-
- /* The loop always starts with the lock hold. */
- if (listLength(bio_jobs[type]) == 0) {
- pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
- continue;
- }
- /* Pop the job from the queue. */
- ln = listFirst(bio_jobs[type]);
- job = ln->value;
- /* It is now possible to unlock the background system as we know have
- * a stand alone job structure to process.*/
- pthread_mutex_unlock(&bio_mutex[type]);
-
- /* Process the job accordingly to its type. */
- if (type == BIO_CLOSE_FILE) {
- close((long)job->arg1);
- } else if (type == BIO_AOF_FSYNC) {
- redis_fsync((long)job->arg1);
- } else if (type == BIO_LAZY_FREE) {
- /* What we free changes depending on what arguments are set:
- * arg1 -> free the object at pointer.
- * arg2 & arg3 -> free two dictionaries (a Redis DB).
- * only arg3 -> free the skiplist. */
- if (job->arg1)
- lazyfreeFreeObjectFromBioThread(job->arg1);
- else if (job->arg2 && job->arg3)
- lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
- else if (job->arg3)
- lazyfreeFreeSlotsMapFromBioThread(job->arg3);
- } else {
- serverPanic("Wrong job type in bioProcessBackgroundJobs().");
- }
- zfree(job);
-
- /* Lock again before reiterating the loop, if there are no longer
- * jobs to process we'll block again in pthread_cond_wait(). */
- pthread_mutex_lock(&bio_mutex[type]);
- listDelNode(bio_jobs[type],ln);
- bio_pending[type]--;
-
- /* Unblock threads blocked on bioWaitStepOfType() if any. */
- pthread_cond_broadcast(&bio_step_cond[type]);
- }

接下来,就会进入循环,在循环开始,如果对应任务类型的任务队列为空,阻塞并准备被bio_newjob_cond唤醒。当上文的bioCreateBackgroudJob()方法被调用,证明任务队列中有新任务被增加,被唤醒后从新开始循环,在队伍列表中的头部获取任务,并根据对应的任务类型取得job中的参数执行。
在执行完毕之后释放job的空间,加锁并删除队列里的任务,对应统计任务数量的bio_pending减一,表示任务数量减一,并广播唤醒因为对应类型的bio_step_cond被阻塞的线程。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。