当前位置:   article > 正文

workflow 01-mpoller_workflow源码

workflow源码

本篇开始介绍 workflow 源码 从里往外 进行了解 ,作为一个异步事件驱动服务器引擎,无论如何底层都是对事件的管理 和封装,本质就是一个事件收集器 ,可以这么理解,将io 事件 进行 收集回调 上成应用 ,简称”套娃“, workflow: C++并行计算与异步网络引擎 ⭐⭐⭐ (gitee.com)

  1. /*
  2. Copyright (c) 2019 Sogou, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. Author: Xie Han (xiehan@sogou-inc.com)
  13. */
  14. #ifndef _MPOLLER_H_
  15. #define _MPOLLER_H_
  16. #include <stddef.h>
  17. #include "poller.h"
  18. /*
  19. * 1、poller ->>mpoller_t 管理
  20. * 2、结构体的别名
  21. */
  22. typedef struct __mpoller mpoller_t;
  23. #ifdef __cplusplus
  24. extern "C"
  25. {
  26. #endif
  27. mpoller_t *mpoller_create(const struct poller_params *params, size_t nthreads);
  28. int mpoller_start(mpoller_t *mpoller);
  29. void mpoller_stop(mpoller_t *mpoller);
  30. void mpoller_destroy(mpoller_t *mpoller);
  31. #ifdef __cplusplus
  32. }
  33. #endif
  34. struct __mpoller
  35. {
  36. void **nodes_buf;
  37. unsigned int nthreads;
  38. poller_t *poller[1];
  39. };
  40. static inline int mpoller_add(const struct poller_data *data, int timeout,
  41. mpoller_t *mpoller)
  42. {
  43. unsigned int index = (unsigned int)data->fd % mpoller->nthreads;
  44. return poller_add(data, timeout, mpoller->poller[index]);
  45. }
  46. static inline int mpoller_del(int fd, mpoller_t *mpoller)
  47. {
  48. unsigned int index = (unsigned int)fd % mpoller->nthreads;
  49. return poller_del(fd, mpoller->poller[index]);
  50. }
  51. static inline int mpoller_mod(const struct poller_data *data, int timeout,
  52. mpoller_t *mpoller)
  53. {
  54. unsigned int index = (unsigned int)data->fd % mpoller->nthreads;
  55. return poller_mod(data, timeout, mpoller->poller[index]);
  56. }
  57. static inline int mpoller_set_timeout(int fd, int timeout, mpoller_t *mpoller)
  58. {
  59. unsigned int index = (unsigned int)fd % mpoller->nthreads;
  60. return poller_set_timeout(fd, timeout, mpoller->poller[index]);
  61. }
  62. static inline int mpoller_add_timer(const struct timespec *value, void *context,
  63. mpoller_t *mpoller)
  64. {
  65. static unsigned int n = 0;
  66. unsigned int index = n++ % mpoller->nthreads;
  67. return poller_add_timer(value, context, mpoller->poller[index]);
  68. }
  69. #endif
  1. /*
  2. Copyright (c) 2019 Sogou, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. Author: Xie Han (xiehan@sogou-inc.com)
  13. */
  14. #include <stddef.h>
  15. #include <stdlib.h>
  16. #include "poller.h"
  17. #include "mpoller.h"
  18. extern poller_t *__poller_create(void **, const struct poller_params *);
  19. extern void __poller_destroy(poller_t *);
  20. static int __mpoller_create(const struct poller_params *params,
  21. mpoller_t *mpoller)
  22. {
  23. // 双指针 指向指针的指针 因为传入的mpoller 就是个指针 文件描述符的大小
  24. void **nodes_buf = (void **)calloc(params->max_open_files, sizeof (void *));
  25. unsigned int i;
  26. if (nodes_buf)
  27. {
  28. // 根据线程数据 创建相对于的poller 传入参数
  29. for (i = 0; i < mpoller->nthreads; i++)
  30. {
  31. mpoller->poller[i] = __poller_create(nodes_buf, params);
  32. if (!mpoller->poller[i])
  33. break;
  34. }
  35. // 如果创建的i 和线程数相同的话
  36. if (i == mpoller->nthreads)
  37. {
  38. // nodes_buf 指定
  39. mpoller->nodes_buf = nodes_buf;
  40. return 0;
  41. }
  42. // 如果创建失败,则进行销毁poller 和释放 nodes_buf
  43. while (i > 0)
  44. __poller_destroy(mpoller->poller[--i]);
  45. free(nodes_buf);
  46. }
  47. return -1;
  48. }
  49. // poller 创建 需要考虑创建成功 或者 失败
  50. mpoller_t *mpoller_create(const struct poller_params *params, size_t nthreads)
  51. {
  52. mpoller_t *mpoller;
  53. size_t size;
  54. if (nthreads == 0)
  55. nthreads = 1;
  56. // 根据线程分配 结构体偏移 到 poller 并增加 nthreads 个指针大小内存
  57. // poller_t *poller[1] 扩增 poller 数组大小
  58. size = offsetof(mpoller_t, poller) + nthreads * sizeof (void *);
  59. mpoller = (mpoller_t *)malloc(size);
  60. if (mpoller)
  61. {
  62. mpoller->nthreads = (unsigned int)nthreads;
  63. if (__mpoller_create(params, mpoller) >= 0)
  64. return mpoller;
  65. free(mpoller);
  66. }
  67. return NULL;
  68. }
  69. // poller 启动 启动成功 和启动失败的措施
  70. int mpoller_start(mpoller_t *mpoller)
  71. {
  72. size_t i;
  73. for (i = 0; i < mpoller->nthreads; i++)
  74. {
  75. if (poller_start(mpoller->poller[i]) < 0)
  76. break;
  77. }
  78. if (i == mpoller->nthreads)
  79. return 0;
  80. while (i > 0)
  81. poller_stop(mpoller->poller[--i]);
  82. return -1;
  83. }
  84. // poller 停止
  85. void mpoller_stop(mpoller_t *mpoller)
  86. {
  87. size_t i;
  88. for (i = 0; i < mpoller->nthreads; i++)
  89. poller_stop(mpoller->poller[i]);
  90. }
  91. // poller 销毁 并释放内存 和 mpoller 因为再创建的时候 mpoller_create 分配了该内存
  92. void mpoller_destroy(mpoller_t *mpoller)
  93. {
  94. size_t i;
  95. for (i = 0; i < mpoller->nthreads; i++)
  96. __poller_destroy(mpoller->poller[i]);
  97. free(mpoller->nodes_buf);
  98. free(mpoller);
  99. }

这个mpoller 就是用来管理 epoll 创建,几个线程就创建几个 poller ,整体比较简单,其中要注意的是对

c 语言的使用,如下;extern 外部函数 说明在其他地方已经实现

  1. extern poller_t *__poller_create(void **, const struct poller_params *);
  2. extern void __poller_destroy(poller_t *);

分配 max_open_files 个文件描述符大小空间

void **nodes_buf = (void **)calloc(params->max_open_files, sizeof (void *));

根据线程分配 结构体偏移 到 poller 并增加 nthreads 个指针大小内存 poller_t *poller[1] 扩增 poller 数组大小 , offsetof 的使用 用于 结构体变量的字节偏移

size = offsetof(mpoller_t, poller) + nthreads * sizeof (void *);

workflow 的代码非常值得我们学习和研究 ,你要了解 整个架构的是如何 设计,以及各个模块之间是如何配合的,希望我的理解可以对你有帮助,通过对开源的学习,可以提高自己能力,其余的会在其他短篇文章里面进行讲解,尽量更新。

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号