赞
踩
本篇开始介绍 workflow 源码 从里往外 进行了解 ,作为一个异步事件驱动服务器引擎,无论如何底层都是对事件的管理 和封装,本质就是一个事件收集器 ,可以这么理解,将io 事件 进行 收集回调 上成应用 ,简称”套娃“, workflow: C++并行计算与异步网络引擎 ⭐⭐⭐ (gitee.com)
- /*
- Copyright (c) 2019 Sogou, Inc.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- Author: Xie Han (xiehan@sogou-inc.com)
- */
-
- #ifndef _MPOLLER_H_
- #define _MPOLLER_H_
-
- #include <stddef.h>
- #include "poller.h"
-
-
-
- /*
- * 1、poller ->>mpoller_t 管理
- * 2、结构体的别名
- */
- typedef struct __mpoller mpoller_t;
-
- #ifdef __cplusplus
- extern "C"
- {
- #endif
-
- mpoller_t *mpoller_create(const struct poller_params *params, size_t nthreads);
- int mpoller_start(mpoller_t *mpoller);
- void mpoller_stop(mpoller_t *mpoller);
- void mpoller_destroy(mpoller_t *mpoller);
-
- #ifdef __cplusplus
- }
- #endif
-
- struct __mpoller
- {
- void **nodes_buf;
- unsigned int nthreads;
- poller_t *poller[1];
- };
-
- static inline int mpoller_add(const struct poller_data *data, int timeout,
- mpoller_t *mpoller)
- {
- unsigned int index = (unsigned int)data->fd % mpoller->nthreads;
- return poller_add(data, timeout, mpoller->poller[index]);
- }
-
- static inline int mpoller_del(int fd, mpoller_t *mpoller)
- {
- unsigned int index = (unsigned int)fd % mpoller->nthreads;
- return poller_del(fd, mpoller->poller[index]);
- }
-
- static inline int mpoller_mod(const struct poller_data *data, int timeout,
- mpoller_t *mpoller)
- {
- unsigned int index = (unsigned int)data->fd % mpoller->nthreads;
- return poller_mod(data, timeout, mpoller->poller[index]);
- }
-
- static inline int mpoller_set_timeout(int fd, int timeout, mpoller_t *mpoller)
- {
- unsigned int index = (unsigned int)fd % mpoller->nthreads;
- return poller_set_timeout(fd, timeout, mpoller->poller[index]);
- }
-
- static inline int mpoller_add_timer(const struct timespec *value, void *context,
- mpoller_t *mpoller)
- {
- static unsigned int n = 0;
- unsigned int index = n++ % mpoller->nthreads;
- return poller_add_timer(value, context, mpoller->poller[index]);
- }
-
- #endif
-
- /*
- Copyright (c) 2019 Sogou, Inc.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- Author: Xie Han (xiehan@sogou-inc.com)
- */
-
- #include <stddef.h>
- #include <stdlib.h>
- #include "poller.h"
- #include "mpoller.h"
-
- extern poller_t *__poller_create(void **, const struct poller_params *);
- extern void __poller_destroy(poller_t *);
-
- static int __mpoller_create(const struct poller_params *params,
- mpoller_t *mpoller)
- {
- // 双指针 指向指针的指针 因为传入的mpoller 就是个指针 文件描述符的大小
- void **nodes_buf = (void **)calloc(params->max_open_files, sizeof (void *));
- unsigned int i;
-
- if (nodes_buf)
- {
- // 根据线程数据 创建相对于的poller 传入参数
- for (i = 0; i < mpoller->nthreads; i++)
- {
- mpoller->poller[i] = __poller_create(nodes_buf, params);
- if (!mpoller->poller[i])
- break;
- }
- // 如果创建的i 和线程数相同的话
- if (i == mpoller->nthreads)
- {
- // nodes_buf 指定
- mpoller->nodes_buf = nodes_buf;
- return 0;
- }
- // 如果创建失败,则进行销毁poller 和释放 nodes_buf
- while (i > 0)
- __poller_destroy(mpoller->poller[--i]);
-
- free(nodes_buf);
- }
-
- return -1;
- }
-
- // poller 创建 需要考虑创建成功 或者 失败
- mpoller_t *mpoller_create(const struct poller_params *params, size_t nthreads)
- {
- mpoller_t *mpoller;
- size_t size;
-
- if (nthreads == 0)
- nthreads = 1;
- // 根据线程分配 结构体偏移 到 poller 并增加 nthreads 个指针大小内存
- // poller_t *poller[1] 扩增 poller 数组大小
- size = offsetof(mpoller_t, poller) + nthreads * sizeof (void *);
- mpoller = (mpoller_t *)malloc(size);
- if (mpoller)
- {
- mpoller->nthreads = (unsigned int)nthreads;
- if (__mpoller_create(params, mpoller) >= 0)
- return mpoller;
-
- free(mpoller);
- }
-
- return NULL;
- }
-
- // poller 启动 启动成功 和启动失败的措施
- int mpoller_start(mpoller_t *mpoller)
- {
- size_t i;
-
- for (i = 0; i < mpoller->nthreads; i++)
- {
- if (poller_start(mpoller->poller[i]) < 0)
- break;
- }
-
- if (i == mpoller->nthreads)
- return 0;
-
- while (i > 0)
- poller_stop(mpoller->poller[--i]);
-
- return -1;
- }
-
- // poller 停止
- void mpoller_stop(mpoller_t *mpoller)
- {
- size_t i;
-
- for (i = 0; i < mpoller->nthreads; i++)
- poller_stop(mpoller->poller[i]);
- }
-
- // poller 销毁 并释放内存 和 mpoller 因为再创建的时候 mpoller_create 分配了该内存
- void mpoller_destroy(mpoller_t *mpoller)
- {
- size_t i;
-
- for (i = 0; i < mpoller->nthreads; i++)
- __poller_destroy(mpoller->poller[i]);
-
- free(mpoller->nodes_buf);
- free(mpoller);
- }
-
这个mpoller 就是用来管理 epoll 创建,几个线程就创建几个 poller ,整体比较简单,其中要注意的是对
c 语言的使用,如下;extern 外部函数 说明在其他地方已经实现
- extern poller_t *__poller_create(void **, const struct poller_params *);
- extern void __poller_destroy(poller_t *);
分配 max_open_files 个文件描述符大小空间
void **nodes_buf = (void **)calloc(params->max_open_files, sizeof (void *));
根据线程分配 结构体偏移 到 poller 并增加 nthreads 个指针大小内存 poller_t *poller[1] 扩增 poller 数组大小 , offsetof 的使用 用于 结构体变量的字节偏移
size = offsetof(mpoller_t, poller) + nthreads * sizeof (void *);
workflow 的代码非常值得我们学习和研究 ,你要了解 整个架构的是如何 设计,以及各个模块之间是如何配合的,希望我的理解可以对你有帮助,通过对开源的学习,可以提高自己能力,其余的会在其他短篇文章里面进行讲解,尽量更新。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。