当前位置:   article > 正文

redis bio线程任务队列_fatal: can't initialize background jobs.

fatal: can't initialize background jobs.

bio部分是redis中负责后台进行文件关闭,aof文件缓冲区同步到磁盘,清理对象三种任务的部分。

  1. /* Background job opcodes */
  2. #define BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */
  3. #define BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */
  4. #define BIO_LAZY_FREE 2 /* Deferred objects freeing. */
  5. #define BIO_NUM_OPS 3

其中0,1,2代表三种工作内容,3则为bio中管理的后台任务类型数量,也就是前三者的数量。

  1. static pthread_t bio_threads[BIO_NUM_OPS];
  2. static pthread_mutex_t bio_mutex[BIO_NUM_OPS];
  3. static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];
  4. static pthread_cond_t bio_step_cond[BIO_NUM_OPS];
  5. static list *bio_jobs[BIO_NUM_OPS];

bio_threads为存放工作线程的数组,数组大小为后台任务类型数量。

bio_mutex对应工作线程所需要的锁。

bio_newjob_cond为当任务队列为空,工作线程等待时等待被唤醒的cond。

bio_step_cond则为其他线程等待某任务执行时,监听的cond以试图被唤醒。

biojobs则为任务队列,是链表,用来存放相应类型准备执行的任务。

  1. for (j = 0; j < BIO_NUM_OPS; j++) {
  2. void *arg = (void*)(unsigned long) j;
  3. if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
  4. serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
  5. exit(1);
  6. }
  7. bio_threads[j] = thread;
  8. }

在bio的初试化方法bioInit()方法中,会依次建立三个执行bioProcessBackgroudJobs()方法的工作线程,存放在bio_threads中。

  1. void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
  2. struct bio_job *job = zmalloc(sizeof(*job));
  3. job->time = time(NULL);
  4. job->arg1 = arg1;
  5. job->arg2 = arg2;
  6. job->arg3 = arg3;
  7. pthread_mutex_lock(&bio_mutex[type]);
  8. listAddNodeTail(bio_jobs[type],job);
  9. bio_pending[type]++;
  10. pthread_cond_signal(&bio_newjob_cond[type]);
  11. pthread_mutex_unlock(&bio_mutex[type]);
  12. }

当需要执行相应类型的后台任务的时候,需要通过bioCreateBackgroudJob()方法,首先,申请一个bio_job的空间,设置相应参数,然后根据任务类型加锁,将任务加入至任务队列,增加统计对应任务类型执行数量bio_pending,接下来唤醒处于等待bio_newjob_cond执行新任务的工作线程,以便执行相应的任务,最后解锁。

 

可以看到工作线程的真正执行逻辑的地方,bioProcessBackgrounJobs()。

  1. if (type >= BIO_NUM_OPS) {
  2. serverLog(LL_WARNING,
  3. "Warning: bio thread started with wrong type %lu",type);
  4. return NULL;
  5. }
  6. /* Make the thread killable at any time, so that bioKillThreads()
  7. * can work reliably. */
  8. pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
  9. pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);

在其开始,如果类型为不支持的任务类型,会直接结束。并将该线程设置为可随时终止的线程。

  1. while(1) {
  2. listNode *ln;
  3. /* The loop always starts with the lock hold. */
  4. if (listLength(bio_jobs[type]) == 0) {
  5. pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
  6. continue;
  7. }
  8. /* Pop the job from the queue. */
  9. ln = listFirst(bio_jobs[type]);
  10. job = ln->value;
  11. /* It is now possible to unlock the background system as we know have
  12. * a stand alone job structure to process.*/
  13. pthread_mutex_unlock(&bio_mutex[type]);
  14. /* Process the job accordingly to its type. */
  15. if (type == BIO_CLOSE_FILE) {
  16. close((long)job->arg1);
  17. } else if (type == BIO_AOF_FSYNC) {
  18. redis_fsync((long)job->arg1);
  19. } else if (type == BIO_LAZY_FREE) {
  20. /* What we free changes depending on what arguments are set:
  21. * arg1 -> free the object at pointer.
  22. * arg2 & arg3 -> free two dictionaries (a Redis DB).
  23. * only arg3 -> free the skiplist. */
  24. if (job->arg1)
  25. lazyfreeFreeObjectFromBioThread(job->arg1);
  26. else if (job->arg2 && job->arg3)
  27. lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
  28. else if (job->arg3)
  29. lazyfreeFreeSlotsMapFromBioThread(job->arg3);
  30. } else {
  31. serverPanic("Wrong job type in bioProcessBackgroundJobs().");
  32. }
  33. zfree(job);
  34. /* Lock again before reiterating the loop, if there are no longer
  35. * jobs to process we'll block again in pthread_cond_wait(). */
  36. pthread_mutex_lock(&bio_mutex[type]);
  37. listDelNode(bio_jobs[type],ln);
  38. bio_pending[type]--;
  39. /* Unblock threads blocked on bioWaitStepOfType() if any. */
  40. pthread_cond_broadcast(&bio_step_cond[type]);
  41. }

接下来,就会进入循环,在循环开始,如果对应任务类型的任务队列为空,阻塞并准备被bio_newjob_cond唤醒。当上文的bioCreateBackgroudJob()方法被调用,证明任务队列中有新任务被增加,被唤醒后从新开始循环,在队伍列表中的头部获取任务,并根据对应的任务类型取得job中的参数执行。

在执行完毕之后释放job的空间,加锁并删除队列里的任务,对应统计任务数量的bio_pending减一,表示任务数量减一,并广播唤醒因为对应类型的bio_step_cond被阻塞的线程。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/409177
推荐阅读
相关标签
  

闽ICP备14008679号