当前位置:   article > 正文

sonic orch调度系统之----orchagent

orchagent

 sonic核心守护线程orchagent以orch为单位进行资源管理,一个orch包含了一组相似的资源;orchagent调度系统以Executor为调度单位,调度实体有Consumer,ExecutableTimer等,本文分析一下sonic调度细节。

class OrchDaemon

orchagent以OrchDaemon作为核心类进行描述。

  1. class OrchDaemon
  2. {
  3. public:
  4. OrchDaemon(DBConnector *, DBConnector *, DBConnector *);
  5. ~OrchDaemon();
  6. bool init();//初始化进程
  7. void start();//启动调度系统
  8. private:
  9. //连接了三个数据库
  10. DBConnector *m_applDb;
  11. DBConnector *m_configDb;
  12. DBConnector *m_stateDb;
  13. //包含所有的orch
  14. std::vector<Orch *> m_orchList;
  15. //创建的select多路异步IO控制块
  16. Select *m_select;
  17. //将asic_db的pipe进行flush,不在等待。
  18. void flush();
  19. };

bool OrchDaemon::init()

初始化orchagent执行环境。

  1. bool OrchDaemon::init()
  2. {
  3. SWSS_LOG_ENTER();
  4. ......
  5. //连接数据库
  6. TableConnector confDbAclTable(m_configDb, CFG_ACL_TABLE_NAME);
  7. TableConnector confDbAclRuleTable(m_configDb, CFG_ACL_RULE_TABLE_NAME);
  8. TableConnector stateDbLagTable(m_stateDb, STATE_LAG_TABLE_NAME);
  9. vector<TableConnector> acl_table_connectors = {
  10. confDbAclTable,
  11. confDbAclRuleTable,
  12. stateDbLagTable
  13. };
  14. ......
  15. //收集对应的orch
  16. m_orchList = { switch_orch, gCrmOrch, gBufferOrch, gPortsOrch, intfs_orch, gNeighOrch, gRouteOrch, copp_orch, tunnel_decap_orch, qos_orch, mirror_orch, gAclOrch, gFdbOrch, vrf_orch };
  17. //创建多路事件控制块
  18. m_select = new Select();
  19. ......
  20. //除了上面的orch外,其它代码还添加了一些orch,这里不再列出
  21. return true;
  22. }

void OrchDaemon::flush()

将asic_db的生产者的pipeline清空

  1. /* Flush redis through sairedis interface */
  2. void OrchDaemon::flush()
  3. {
  4. SWSS_LOG_ENTER();
  5. sai_attribute_t attr;
  6. attr.id = SAI_REDIS_SWITCH_ATTR_FLUSH;
  7. sai_status_t status = sai_switch_api->set_switch_attribute(gSwitchId, &attr);
  8. if (status != SAI_STATUS_SUCCESS)
  9. {
  10. SWSS_LOG_ERROR("Failed to flush redis pipeline %d", status);
  11. exit(EXIT_FAILURE);
  12. }
  13. }

void OrchDaemon::start()

  1. void OrchDaemon::start()
  2. {
  3. SWSS_LOG_ENTER();
  4. //遍历每一个orch,将每一个orch中的所有关心的事件加入epoll中,基本一个Executor为一个事件
  5. for (Orch *o : m_orchList)
  6. {
  7. m_select->addSelectables(o->getSelectables());
  8. }
  9. //进入dead loop
  10. while (true)
  11. {
  12. Selectable *s;
  13. int ret;
  14. // 进行epoll阻塞监听
  15. ret = m_select->select(&s, SELECT_TIMEOUT);
  16. // 错误事件,继续监听
  17. if (ret == Select::ERROR)
  18. {
  19. SWSS_LOG_NOTICE("Error: %s!\n", strerror(errno));
  20. continue;
  21. }
  22. //超时事件
  23. if (ret == Select::TIMEOUT)
  24. {
  25. /* Let sairedis to flush all SAI function call to ASIC DB.
  26. * Normally the redis pipeline will flush when enough request
  27. * accumulated. Still it is possible that small amount of
  28. * requests live in it. When the daemon has nothing to do, it
  29. * is a good chance to flush the pipeline
  30. * 确保redis-pipeline中的少量请求在10秒后能够得到处理,不在积累请求
  31. */
  32. flush();
  33. continue;
  34. }
  35. //获取触发epoll事件的Executor
  36. auto *c = (Executor *)s;
  37. //对于Consumer来说,执行数据库操作,将redis中的通知转换到m_tosync中,并且执行以下m_tosync中的task
  38. c->execute();
  39. /* After each iteration, periodically check all m_toSync map to
  40. * execute all the remaining tasks that need to be retried. */
  41. /* TODO: Abstract Orch class to have a specific todo list */
  42. // 执行其它的orch中遗留的任务
  43. for (Orch *o : m_orchList)
  44. o->doTask();
  45. }
  46. }

实例分析

下面我们以orchagent处理的最多的事件:ConsumerStateTable来分析一下orchagent调度系统。

ConsumerStateTable即orchagent订阅的app_db事件。格式如下所示:

  1. "SADD" "INTF_TABLE_KEY_SET" "PortChannel1:1.1.1.1/8" #在集合INTF_TABLE_KEY_SET中增加一个key
  2. "HSET" "INTF_TABLE:PortChannel1:1.1.1.1/8" "scope" "global"
  3. "HSET" "INTF_TABLE:PortChannel1:1.1.1.1/8" "family" "IPv4"
  4. "PUBLISH" "INTF_TABLE_CHANNEL" "G"
  • 当生产者发送命令"PUBLISH" "INTF_TABLE_CHANNEL" "G",orchagent从epoll中被唤醒,然后从对应的redis客户端句柄中调用RedisSelect::readData()进行数据读取该命令的应答,应答如下所示:
  1. 1) "message"
  2. 2) "INTF_TABLE_CHANNEL"
  3. 3) "G"
  • 每收到一个应答都会对RedisSelect::m_queueLength进行加1,该应答只有一个信息"G",只起到通知作用。所以如果生产者同时写入了大量的app_db事件,某一个epoll唤醒,RedisSelect::readData()可以读出大量的应答,从而导致m_queueLength大于1。
  • 读完数据后将该事件Selectable添加到Select:m_ready中。同时通过调用RedisSelect::updateAfterRead()对RedisSelect::m_queueLength进行减1,这里只减去了1,而没有根据实际处理的情况减去对应的值,这里存在增加多个值减去一个值的情况。只要m_queueLength不为1,就不会将Selectable从Select:m_ready删除。
  • Select::select函数会返回Select:m_ready中所有的Selectable。
  • 遍历每一个selectable,在函数Consumer::execute()中调用TableEntryPoppable::pops使用脚本consumer_state_table_pops.lua处理INTF_TABLE_KEY_SET中的key,每次最多处理128个key。
  1. local ret = {}
  2. local keys = redis.call('SPOP', KEYS[1], ARGV[1])--一次处理128key
  3. local n = table.getn(keys)
  4. for i = 1, n do
  5. local key = keys[i]
  6. local values = redis.call('HGETALL', KEYS[2] .. key)
  7. table.insert(ret, {key, values})
  8. end
  9. return ret

脚本返回的内容将会是:

  1. INTF_TABLE:PortChannel1:1.1.1.1/8:{
  2. "scope":"global",
  3. "family": "IPv4"
  4. }

pops然后对上面的内容进行加工成如下格式:

std::tuple<std::string, std::string, std::vector<FieldValueTuple> >

即最后返回:

"SET""INTF_TABLE:PortChannel1:1.1.1.1/8", <"scope":"global","family""IPv4">
  • Consumer::execute()会对前面返回的值添加到Consumer::m_toSync,这里会进行合并。
  • 最后OrchDaemon::start()函数会执行每一个orch的Consumer::m_toSync中的task。

sonic调度系统的缺陷

sonic调度系统过于简单,无法处理大规模,逻辑复杂的业务,效率非常低下。

  1. 当超规格下发配置的时候,会导致m_toSync中因为资源不足而驻留大量的task。OrchDaemon::start()每一次事件触发都会遍历所有orch的m_toSync中的task,造成无效task频繁执行,引发orchagent发生震荡。必须给Consumer增加一些标记,表明当前Consumer处于资源不足状态,暂不执行m_toSync中的task
  2. 当大规模下发配置的时候,如果本配置的依赖还没有下发,会导致m_toSync中因为依赖不满足而驻留大量的task。引发orchagent震荡问题。
  3. 当大规模下删除配置的时候,如果本配置引用还没有删除,会导致m_toSync中因为被引用不能删除而驻留大量的task。引发orchagent震荡问题。
  4. orchagent在处理task的时候没有考虑事件处理顺序,应该首先执行DEL操作,然后再执行SET操作。对于DEL操作,应该先删除低优先级的Consumer,对于SET操作来说,应该先处理高优先级的Consumer。两者顺序相反。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/765777
推荐阅读
相关标签
  

闽ICP备14008679号