赞
踩
sonic核心守护线程orchagent以orch为单位进行资源管理,一个orch包含了一组相似的资源;orchagent调度系统以Executor为调度单位,调度实体有Consumer,ExecutableTimer等,本文分析一下sonic调度细节。
orchagent以OrchDaemon作为核心类进行描述。
- class OrchDaemon
- {
- public:
- OrchDaemon(DBConnector *, DBConnector *, DBConnector *);
- ~OrchDaemon();
-
- bool init();//初始化进程
- void start();//启动调度系统
- private:
- //连接了三个数据库
- DBConnector *m_applDb;
- DBConnector *m_configDb;
- DBConnector *m_stateDb;
- //包含所有的orch
- std::vector<Orch *> m_orchList;
- //创建的select多路异步IO控制块
- Select *m_select;
- //将asic_db的pipe进行flush,不在等待。
- void flush();
- };

初始化orchagent执行环境。
- bool OrchDaemon::init()
- {
- SWSS_LOG_ENTER();
-
- ......
-
- //连接数据库
- TableConnector confDbAclTable(m_configDb, CFG_ACL_TABLE_NAME);
- TableConnector confDbAclRuleTable(m_configDb, CFG_ACL_RULE_TABLE_NAME);
- TableConnector stateDbLagTable(m_stateDb, STATE_LAG_TABLE_NAME);
-
- vector<TableConnector> acl_table_connectors = {
- confDbAclTable,
- confDbAclRuleTable,
- stateDbLagTable
- };
-
- ......
-
- //收集对应的orch
- m_orchList = { switch_orch, gCrmOrch, gBufferOrch, gPortsOrch, intfs_orch, gNeighOrch, gRouteOrch, copp_orch, tunnel_decap_orch, qos_orch, mirror_orch, gAclOrch, gFdbOrch, vrf_orch };
- //创建多路事件控制块
- m_select = new Select();
-
- ......
- //除了上面的orch外,其它代码还添加了一些orch,这里不再列出
-
- return true;
- }

将asic_db的生产者的pipeline清空
- /* Flush redis through sairedis interface */
- void OrchDaemon::flush()
- {
- SWSS_LOG_ENTER();
-
- sai_attribute_t attr;
- attr.id = SAI_REDIS_SWITCH_ATTR_FLUSH;
- sai_status_t status = sai_switch_api->set_switch_attribute(gSwitchId, &attr);
- if (status != SAI_STATUS_SUCCESS)
- {
- SWSS_LOG_ERROR("Failed to flush redis pipeline %d", status);
- exit(EXIT_FAILURE);
- }
- }
- void OrchDaemon::start()
- {
- SWSS_LOG_ENTER();
- //遍历每一个orch,将每一个orch中的所有关心的事件加入epoll中,基本一个Executor为一个事件
- for (Orch *o : m_orchList)
- {
- m_select->addSelectables(o->getSelectables());
- }
- //进入dead loop
- while (true)
- {
- Selectable *s;
- int ret;
- // 进行epoll阻塞监听
- ret = m_select->select(&s, SELECT_TIMEOUT);
- // 错误事件,继续监听
- if (ret == Select::ERROR)
- {
- SWSS_LOG_NOTICE("Error: %s!\n", strerror(errno));
- continue;
- }
- //超时事件
- if (ret == Select::TIMEOUT)
- {
- /* Let sairedis to flush all SAI function call to ASIC DB.
- * Normally the redis pipeline will flush when enough request
- * accumulated. Still it is possible that small amount of
- * requests live in it. When the daemon has nothing to do, it
- * is a good chance to flush the pipeline
- * 确保redis-pipeline中的少量请求在10秒后能够得到处理,不在积累请求
- */
- flush();
- continue;
- }
- //获取触发epoll事件的Executor
- auto *c = (Executor *)s;
- //对于Consumer来说,执行数据库操作,将redis中的通知转换到m_tosync中,并且执行以下m_tosync中的task
- c->execute();
-
- /* After each iteration, periodically check all m_toSync map to
- * execute all the remaining tasks that need to be retried. */
-
- /* TODO: Abstract Orch class to have a specific todo list */
- // 执行其它的orch中遗留的任务
- for (Orch *o : m_orchList)
- o->doTask();
-
- }
- }

下面我们以orchagent处理的最多的事件:ConsumerStateTable来分析一下orchagent调度系统。
ConsumerStateTable即orchagent订阅的app_db事件。格式如下所示:
- "SADD" "INTF_TABLE_KEY_SET" "PortChannel1:1.1.1.1/8" #在集合INTF_TABLE_KEY_SET中增加一个key
- "HSET" "INTF_TABLE:PortChannel1:1.1.1.1/8" "scope" "global"
- "HSET" "INTF_TABLE:PortChannel1:1.1.1.1/8" "family" "IPv4"
- "PUBLISH" "INTF_TABLE_CHANNEL" "G"
- 1) "message"
- 2) "INTF_TABLE_CHANNEL"
- 3) "G"
- local ret = {}
- local keys = redis.call('SPOP', KEYS[1], ARGV[1])--一次处理128个key
- local n = table.getn(keys)
- for i = 1, n do
- local key = keys[i]
- local values = redis.call('HGETALL', KEYS[2] .. key)
- table.insert(ret, {key, values})
- end
- return ret
脚本返回的内容将会是:
- INTF_TABLE:PortChannel1:1.1.1.1/8:{
- "scope":"global",
- "family": "IPv4"
- }
pops然后对上面的内容进行加工成如下格式:
std::tuple<std::string, std::string, std::vector<FieldValueTuple> >
即最后返回:
"SET", "INTF_TABLE:PortChannel1:1.1.1.1/8", <"scope":"global","family": "IPv4">
sonic调度系统过于简单,无法处理大规模,逻辑复杂的业务,效率非常低下。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。