当前位置:   article > 正文

sonic orch调度系统之----orch_orch::parsereference

orch::parsereference

​ sonic orchagent线程的调度最小单位是Consumer。Consumer是在epoll事件Selectable的基础上的进一步封装,每一次发生epoll事件会触发orchagent进行一次调度。orch是资源的集合,一个orch可以包含多个Consumer,比如acl orch会监听多个redistable。

class Executor

  1. // Design assumption
  2. // 1. one Orch can have one or more Executor
  3. // 2. one Executor must belong to one and only one Orch
  4. // 3. Executor will hold an pointer to new-ed selectable, and delete it during dtor
  5. // 设计假设:
  6. // 1. 一个orch可以拥有一个或者多个Executor
  7. // 2. 一个Executor必须属于一个orch而且仅仅属于一个orch
  8. // 3. Executor有一个指针指向一个new出来的Selectable结构,必须在析构函数中将其删除,否则会泄漏
  9. class Executor : public Selectable
  10. {
  11. public:
  12. Executor(Selectable *selectable, Orch *orch)
  13. : m_selectable(selectable)
  14. , m_orch(orch)
  15. {
  16. }
  17. virtual ~Executor() { delete m_selectable; }
  18. // Decorating Selectable
  19. int getFd() override { return m_selectable->getFd(); }
  20. void readData() override { m_selectable->readData(); }
  21. bool hasCachedData() override { return m_selectable->hasCachedData(); }
  22. bool initializedWithData() override { return m_selectable->initializedWithData(); }
  23. void updateAfterRead() override { m_selectable->updateAfterRead(); }
  24. Orch * getorch() { return m_orch; }
  25. // Disable copying
  26. Executor(const Executor&) = delete;
  27. Executor& operator=(const Executor&) = delete;
  28. // Execute on event happening
  29. // execute执行事件,drain是一个辅助函数
  30. virtual void execute() { }
  31. virtual void drain() { }
  32. protected:
  33. Selectable *m_selectable;//指向new出来的Selectable
  34. Orch *m_orch;//指向一个orch
  35. // Get the underlying selectable 获取指向的Selectable
  36. Selectable *getSelectable() const { return m_selectable; }
  37. };

class Executor只是一个中间的派生类,orch直接使用的是class Consumer和class ExecutableTimer。

class Consumer

消费者类一般用于处理app_db的订阅事件,对于asic_db一般是处理syncd的应答事件。

  1. typedef std::pair<std::string, std::string> FieldValueTuple;
  2. #define fvField std::get<0>
  3. #define fvValue std::get<1>
  4. typedef std::tuple<std::string, std::string, std::vector<FieldValueTuple> > KeyOpFieldsValuesTuple;
  5. #define kfvKey std::get<0>
  6. #define kfvOp std::get<1>
  7. #define kfvFieldsValues std::get<2>
  8. typedef map<string, KeyOpFieldsValuesTuple> SyncMap;
  9. class Consumer : public Executor {
  10. public:
  11. Consumer(TableConsumable *select, Orch *orch)
  12. : Executor(select, orch)
  13. {
  14. }
  15. TableConsumable *getConsumerTable() const
  16. {
  17. return static_cast<TableConsumable *>(getSelectable());
  18. }
  19. string getTableName() const
  20. {
  21. return getConsumerTable()->getTableName();
  22. }
  23. // 事物执行
  24. void execute();
  25. void drain();
  26. /* Store the latest 'golden' status */
  27. // TODO: hide?
  28. SyncMap m_toSync;
  29. };

void Consumer::execute()

epoll事件触发后,需要调用该函数从数据库中读取出指定key的内容,将其加工后存放在m_toSync中,供后续处理。

  1. void Consumer::execute()
  2. {
  3. SWSS_LOG_ENTER();
  4. std::deque<KeyOpFieldsValuesTuple> entries;
  5. //调用pops函数,从redis数据库中读取数据,返回KeyOpFieldsValuesTuple结构
  6. getConsumerTable()->pops(entries);
  7. /* Nothing popped */
  8. if (entries.empty())
  9. {
  10. return;
  11. }
  12. // 遍历每一个事件
  13. for (auto& entry: entries)
  14. {
  15. string key = kfvKey(entry);
  16. string op = kfvOp(entry);
  17. /* Record incoming tasks 记录事件 */
  18. if (gSwssRecord)
  19. {
  20. Orch::recordTuple(*this, entry);
  21. }
  22. /* If a new task comes or if a DEL task comes, we directly put it into getConsumerTable().m_toSync map */
  23. // 在这里进行一次合并,对于删除事件,直接覆盖
  24. if (m_toSync.find(key) == m_toSync.end() || op == DEL_COMMAND)
  25. {
  26. m_toSync[key] = entry;
  27. }
  28. /* If an old task is still there, we combine the old task with new task */
  29. /* */
  30. else
  31. {
  32. KeyOpFieldsValuesTuple existing_data = m_toSync[key];
  33. auto new_values = kfvFieldsValues(entry);
  34. auto existing_values = kfvFieldsValues(existing_data);
  35. //遍历每一个新的值
  36. for (auto it : new_values)
  37. {
  38. string field = fvField(it);
  39. string value = fvValue(it);
  40. auto iu = existing_values.begin();
  41. while (iu != existing_values.end())//遍历每一个旧的值
  42. {
  43. string ofield = fvField(*iu);
  44. if (field == ofield)//相同的域,将老的值覆盖,这里应该跳出while,代码效率较差
  45. iu = existing_values.erase(iu);
  46. else
  47. iu++;
  48. }
  49. /* 将新的值添加进去 */
  50. existing_values.push_back(FieldValueTuple(field, value));
  51. }
  52. m_toSync[key] = KeyOpFieldsValuesTuple(key, op, existing_values);
  53. }
  54. }
  55. //执行所有整理好的任务。
  56. drain();
  57. }

假设有一个task的键值对如下:

  1. key=test;op=set;value={
  2. A:a,
  3. B:b,
  4. C:c,
  5. }

第一次触发任务是在APP_DB中写入了:

  1. key=test;op=set;value={
  2. A:a,
  3. B:b
  4. }

加入orchagent只是将该任务读取到了m_toSync中,由于某种原因没有执行完该任务,依然驻留在m_toSync中。第二次写入了:

  1. key=test;op=set;value={
  2. B:b1,
  3. C:c
  4. }

那么经过execute函数后m_toSync中将会是:

  1. key=test;op=set;value={
  2. A:a,
  3. B:b1,
  4. C:c
  5. }

void Consumer::drain()

执行m_toSync中的任务。

  1. void Consumer::drain()
  2. {
  3. if (!m_toSync.empty())
  4. m_orch->doTask(*this);
  5. }

class Orch

  1. class Orch
  2. {
  3. public:
  4. //每个orch都会连接到数据库,以及其需要订阅的表名,和订阅该表产生的事件的优先级
  5. //以默认优先级订阅一个table
  6. Orch(DBConnector *db, const string tableName, int pri = default_orch_pri);
  7. //以默认优先级订阅多个table
  8. Orch(DBConnector *db, const vector<string> &tableNames);
  9. //订阅多个table,指明每个table的优先级
  10. Orch(DBConnector *db, const vector<table_name_with_pri_t> &tableNameWithPri);
  11. //连接多个数据库
  12. Orch(const vector<TableConnector>& tables);
  13. virtual ~Orch();
  14. // 获取该orch的所有epoll事件
  15. vector<Selectable*> getSelectables();
  16. /* Iterate all consumers in m_consumerMap and run doTask(Consumer) */
  17. // 执行该orch中所有的consumers中的m_sync中的任务
  18. void doTask();
  19. /* Run doTask against a specific executor */
  20. // 任务的来源可以是consumer,NotificationConsumer,SelectableTimer
  21. virtual void doTask(Consumer &consumer) = 0;
  22. virtual void doTask(NotificationConsumer &consumer) { }
  23. virtual void doTask(SelectableTimer &timer) { }
  24. /* TODO: refactor recording */
  25. static void recordTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple);
  26. protected:
  27. // 消费者map,一个orch可以订阅多个tablekey为tableName,value为Executor
  28. ConsumerMap m_consumerMap;
  29. // 与调试相关
  30. static void logfileReopen();
  31. string dumpTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple);
  32. ref_resolve_status resolveFieldRefValue(type_map&, const string&, KeyOpFieldsValuesTuple&, sai_object_id_t&);
  33. bool parseIndexRange(const string &input, sai_uint32_t &range_low, sai_uint32_t &range_high);
  34. bool parseReference(type_map &type_maps, string &ref, string &table_name, string &object_name);
  35. ref_resolve_status resolveFieldRefArray(type_map&, const string&, KeyOpFieldsValuesTuple&, vector<sai_object_id_t>&);
  36. /* Note: consumer will be owned by this class */
  37. // 内部函数添加一个Executor,给addConsumer使用
  38. void addExecutor(string executorName, Executor* executor);
  39. Executor *getExecutor(string executorName);
  40. private:
  41. // 添加一个消费者
  42. void addConsumer(DBConnector *db, string tableName, int pri = default_orch_pri);
  43. };

void Orch::addConsumer(......)

  1. void Orch::addExecutor(string executorName, Executor* executor)
  2. {
  3. m_consumerMap.emplace(std::piecewise_construct,
  4. std::forward_as_tuple(executorName),
  5. std::forward_as_tuple(executor));
  6. }
  7. //添加一个消费者
  8. void Orch::addConsumer(DBConnector *db, string tableName, int pri)
  9. {
  10. if (db->getDbId() == CONFIG_DB || db->getDbId() == STATE_DB)
  11. {
  12. addExecutor(tableName, new Consumer(new SubscriberStateTable(db, tableName, TableConsumable::DEFAULT_POP_BATCH_SIZE, pri), this));
  13. }
  14. else
  15. {
  16. addExecutor(tableName, new Consumer(new ConsumerStateTable(db, tableName, gBatchSize, pri), this));
  17. }
  18. }

void Orch::doTask(......)

执行本orch中的每一个消费者m_toSync中的task,不管该task是否本次从redis中读取还是以前未处理完毕的。

  1. void Orch::doTask()
  2. {
  3. for(auto &it : m_consumerMap)
  4. {
  5. it.second->drain();
  6. }
  7. }

class Orch2

orch2是在orch的基础上的一个封装,代码的可读性增强。

  1. class Orch2 : public Orch
  2. {
  3. public:
  4. Orch2(DBConnector *db, const std::string& tableName, Request& request, int pri=default_orch_pri)
  5. : Orch(db, tableName, pri), request_(request)
  6. {
  7. }
  8. protected:
  9. virtual void doTask(Consumer& consumer);
  10. virtual bool addOperation(const Request& request)=0;
  11. virtual bool delOperation(const Request& request)=0;
  12. private:
  13. Request& request_;
  14. };

 

void Orch2::doTask

  1. void Orch2::doTask(Consumer &consumer)
  2. {
  3. SWSS_LOG_ENTER();
  4. auto it = consumer.m_toSync.begin();
  5. while (it != consumer.m_toSync.end())
  6. {
  7. bool erase_from_queue = true;
  8. try
  9. {
  10. request_.parse(it->second);
  11. auto op = request_.getOperation();
  12. if (op == SET_COMMAND)
  13. {
  14. erase_from_queue = addOperation(request_);
  15. }
  16. else if (op == DEL_COMMAND)
  17. {
  18. erase_from_queue = delOperation(request_);
  19. }
  20. else
  21. {
  22. SWSS_LOG_ERROR("Wrong operation. Check RequestParser: %s", op.c_str());
  23. }
  24. }
  25. catch (const std::invalid_argument& e)
  26. {
  27. SWSS_LOG_ERROR("Parse error: %s", e.what());
  28. }
  29. catch (const std::logic_error& e)
  30. {
  31. SWSS_LOG_ERROR("Logic error: %s", e.what());
  32. }
  33. catch (const std::exception& e)
  34. {
  35. SWSS_LOG_ERROR("Exception was catched in the request parser: %s", e.what());
  36. }
  37. catch (...)
  38. {
  39. SWSS_LOG_ERROR("Unknown exception was catched in the request parser");
  40. }
  41. request_.clear();
  42. //执行成功,那么从m_tosync中删除,否则执行下一个task
  43. if (erase_from_queue)
  44. {
  45. it = consumer.m_toSync.erase(it);
  46. }
  47. else
  48. {
  49. ++it;
  50. }
  51. }
  52. }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号