当前位置:   article > 正文

漫话Redis源码之七十八_fatal: can't initialize background jobs.

fatal: can't initialize background jobs.


  1. #include "server.h"
  2. #include "bio.h"
  3. static pthread_t bio_threads[BIO_NUM_OPS];
  4. static pthread_mutex_t bio_mutex[BIO_NUM_OPS];
  5. static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];
  6. static pthread_cond_t bio_step_cond[BIO_NUM_OPS];
  7. static list *bio_jobs[BIO_NUM_OPS];
  8. /* The following array is used to hold the number of pending jobs for every
  9. * OP type. This allows us to export the bioPendingJobsOfType() API that is
  10. * useful when the main thread wants to perform some operation that may involve
  11. * objects shared with the background thread. The main thread will just wait
  12. * that there are no longer jobs of this type to be executed before performing
  13. * the sensible operation. This data is also useful for reporting. */
  14. static unsigned long long bio_pending[BIO_NUM_OPS];
  15. /* This structure represents a background Job. It is only used locally to this
  16. * file as the API does not expose the internals at all. */
  17. struct bio_job {
  18. time_t time; /* Time at which the job was created. */
  19. /* Job specific arguments.*/
  20. int fd; /* Fd for file based background jobs */
  21. lazy_free_fn *free_fn; /* Function that will free the provided arguments */
  22. void *free_args[]; /* List of arguments to be passed to the free function */
  23. };
  24. void *bioProcessBackgroundJobs(void *arg);
  25. /* Make sure we have enough stack to perform all the things we do in the
  26. * main thread. */
  27. #define REDIS_THREAD_STACK_SIZE (1024*1024*4)
  28. /* Initialize the background system, spawning the thread. */
  29. void bioInit(void) {
  30. pthread_attr_t attr;
  31. pthread_t thread;
  32. size_t stacksize;
  33. int j;
  34. /* Initialization of state vars and objects */
  35. for (j = 0; j < BIO_NUM_OPS; j++) {
  36. pthread_mutex_init(&bio_mutex[j],NULL);
  37. pthread_cond_init(&bio_newjob_cond[j],NULL);
  38. pthread_cond_init(&bio_step_cond[j],NULL);
  39. bio_jobs[j] = listCreate();
  40. bio_pending[j] = 0;
  41. }
  42. /* Set the stack size as by default it may be small in some system */
  43. pthread_attr_init(&attr);
  44. pthread_attr_getstacksize(&attr,&stacksize);
  45. if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
  46. while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
  47. pthread_attr_setstacksize(&attr, stacksize);
  48. /* Ready to spawn our threads. We use the single argument the thread
  49. * function accepts in order to pass the job ID the thread is
  50. * responsible of. */
  51. for (j = 0; j < BIO_NUM_OPS; j++) {
  52. void *arg = (void*)(unsigned long) j;
  53. if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
  54. serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
  55. exit(1);
  56. }
  57. bio_threads[j] = thread;
  58. }
  59. }
  60. void bioSubmitJob(int type, struct bio_job *job) {
  61. job->time = time(NULL);
  62. pthread_mutex_lock(&bio_mutex[type]);
  63. listAddNodeTail(bio_jobs[type],job);
  64. bio_pending[type]++;
  65. pthread_cond_signal(&bio_newjob_cond[type]);
  66. pthread_mutex_unlock(&bio_mutex[type]);
  67. }
  68. void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {
  69. va_list valist;
  70. /* Allocate memory for the job structure and all required
  71. * arguments */
  72. struct bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count));
  73. job->free_fn = free_fn;
  74. va_start(valist, arg_count);
  75. for (int i = 0; i < arg_count; i++) {
  76. job->free_args[i] = va_arg(valist, void *);
  77. }
  78. va_end(valist);
  79. bioSubmitJob(BIO_LAZY_FREE, job);
  80. }
  81. void bioCreateCloseJob(int fd) {
  82. struct bio_job *job = zmalloc(sizeof(*job));
  83. job->fd = fd;
  84. bioSubmitJob(BIO_CLOSE_FILE, job);
  85. }
  86. void bioCreateFsyncJob(int fd) {
  87. struct bio_job *job = zmalloc(sizeof(*job));
  88. job->fd = fd;
  89. bioSubmitJob(BIO_AOF_FSYNC, job);
  90. }
  91. void *bioProcessBackgroundJobs(void *arg) {
  92. struct bio_job *job;
  93. unsigned long type = (unsigned long) arg;
  94. sigset_t sigset;
  95. /* Check that the type is within the right interval. */
  96. if (type >= BIO_NUM_OPS) {
  97. serverLog(LL_WARNING,
  98. "Warning: bio thread started with wrong type %lu",type);
  99. return NULL;
  100. }
  101. switch (type) {
  102. case BIO_CLOSE_FILE:
  103. redis_set_thread_title("bio_close_file");
  104. break;
  105. case BIO_AOF_FSYNC:
  106. redis_set_thread_title("bio_aof_fsync");
  107. break;
  108. case BIO_LAZY_FREE:
  109. redis_set_thread_title("bio_lazy_free");
  110. break;
  111. }
  112. redisSetCpuAffinity(server.bio_cpulist);
  113. makeThreadKillable();
  114. pthread_mutex_lock(&bio_mutex[type]);
  115. /* Block SIGALRM so we are sure that only the main thread will
  116. * receive the watchdog signal. */
  117. sigemptyset(&sigset);
  118. sigaddset(&sigset, SIGALRM);
  119. if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
  120. serverLog(LL_WARNING,
  121. "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
  122. while(1) {
  123. listNode *ln;
  124. /* The loop always starts with the lock hold. */
  125. if (listLength(bio_jobs[type]) == 0) {
  126. pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
  127. continue;
  128. }
  129. /* Pop the job from the queue. */
  130. ln = listFirst(bio_jobs[type]);
  131. job = ln->value;
  132. /* It is now possible to unlock the background system as we know have
  133. * a stand alone job structure to process.*/
  134. pthread_mutex_unlock(&bio_mutex[type]);
  135. /* Process the job accordingly to its type. */
  136. if (type == BIO_CLOSE_FILE) {
  137. close(job->fd);
  138. } else if (type == BIO_AOF_FSYNC) {
  139. /* The fd may be closed by main thread and reused for another
  140. * socket, pipe, or file. We just ignore these errno because
  141. * aof fsync did not really fail. */
  142. if (redis_fsync(job->fd) == -1 &&
  143. errno != EBADF && errno != EINVAL)
  144. {
  145. int last_status;
  146. atomicGet(server.aof_bio_fsync_status,last_status);
  147. atomicSet(server.aof_bio_fsync_status,C_ERR);
  148. atomicSet(server.aof_bio_fsync_errno,errno);
  149. if (last_status == C_OK) {
  150. serverLog(LL_WARNING,
  151. "Fail to fsync the AOF file: %s",strerror(errno));
  152. }
  153. } else {
  154. atomicSet(server.aof_bio_fsync_status,C_OK);
  155. }
  156. } else if (type == BIO_LAZY_FREE) {
  157. job->free_fn(job->free_args);
  158. } else {
  159. serverPanic("Wrong job type in bioProcessBackgroundJobs().");
  160. }
  161. zfree(job);
  162. /* Lock again before reiterating the loop, if there are no longer
  163. * jobs to process we'll block again in pthread_cond_wait(). */
  164. pthread_mutex_lock(&bio_mutex[type]);
  165. listDelNode(bio_jobs[type],ln);
  166. bio_pending[type]--;
  167. /* Unblock threads blocked on bioWaitStepOfType() if any. */
  168. pthread_cond_broadcast(&bio_step_cond[type]);
  169. }
  170. }
  171. /* Return the number of pending jobs of the specified type. */
  172. unsigned long long bioPendingJobsOfType(int type) {
  173. unsigned long long val;
  174. pthread_mutex_lock(&bio_mutex[type]);
  175. val = bio_pending[type];
  176. pthread_mutex_unlock(&bio_mutex[type]);
  177. return val;
  178. }
  179. /* If there are pending jobs for the specified type, the function blocks
  180. * and waits that the next job was processed. Otherwise the function
  181. * does not block and returns ASAP.
  182. *
  183. * The function returns the number of jobs still to process of the
  184. * requested type.
  185. *
  186. * This function is useful when from another thread, we want to wait
  187. * a bio.c thread to do more work in a blocking way.
  188. */
  189. unsigned long long bioWaitStepOfType(int type) {
  190. unsigned long long val;
  191. pthread_mutex_lock(&bio_mutex[type]);
  192. val = bio_pending[type];
  193. if (val != 0) {
  194. pthread_cond_wait(&bio_step_cond[type],&bio_mutex[type]);
  195. val = bio_pending[type];
  196. }
  197. pthread_mutex_unlock(&bio_mutex[type]);
  198. return val;
  199. }
  200. /* Kill the running bio threads in an unclean way. This function should be
  201. * used only when it's critical to stop the threads for some reason.
  202. * Currently Redis does this only on crash (for instance on SIGSEGV) in order
  203. * to perform a fast memory check without other threads messing with memory. */
  204. void bioKillThreads(void) {
  205. int err, j;
  206. for (j = 0; j < BIO_NUM_OPS; j++) {
  207. if (bio_threads[j] == pthread_self()) continue;
  208. if (bio_threads[j] && pthread_cancel(bio_threads[j]) == 0) {
  209. if ((err = pthread_join(bio_threads[j],NULL)) != 0) {
  210. serverLog(LL_WARNING,
  211. "Bio thread for job type #%d can not be joined: %s",
  212. j, strerror(err));
  213. } else {
  214. serverLog(LL_WARNING,
  215. "Bio thread for job type #%d terminated",j);
  216. }
  217. }
  218. }
  219. }

