赞
踩
sonic orchagent线程的调度最小单位是Consumer。Consumer是在epoll事件Selectable的基础上的进一步封装,每一次发生epoll事件会触发orchagent进行一次调度。orch是资源的集合,一个orch可以包含多个Consumer,比如acl orch会监听多个redistable。
- // Design assumption
- // 1. one Orch can have one or more Executor
- // 2. one Executor must belong to one and only one Orch
- // 3. Executor will hold an pointer to new-ed selectable, and delete it during dtor
- // 设计假设:
- // 1. 一个orch可以拥有一个或者多个Executor
- // 2. 一个Executor必须属于一个orch而且仅仅属于一个orch
- // 3. Executor有一个指针指向一个new出来的Selectable结构,必须在析构函数中将其删除,否则会泄漏
- class Executor : public Selectable
- {
- public:
- Executor(Selectable *selectable, Orch *orch)
- : m_selectable(selectable)
- , m_orch(orch)
- {
- }
-
- virtual ~Executor() { delete m_selectable; }
-
- // Decorating Selectable
- int getFd() override { return m_selectable->getFd(); }
- void readData() override { m_selectable->readData(); }
- bool hasCachedData() override { return m_selectable->hasCachedData(); }
- bool initializedWithData() override { return m_selectable->initializedWithData(); }
- void updateAfterRead() override { m_selectable->updateAfterRead(); }
- Orch * getorch() { return m_orch; }
- // Disable copying
- Executor(const Executor&) = delete;
- Executor& operator=(const Executor&) = delete;
-
- // Execute on event happening
- // execute执行事件,drain是一个辅助函数
- virtual void execute() { }
- virtual void drain() { }
-
- protected:
- Selectable *m_selectable;//指向new出来的Selectable
- Orch *m_orch;//指向一个orch
-
- // Get the underlying selectable 获取指向的Selectable
- Selectable *getSelectable() const { return m_selectable; }
- };
class Executor只是一个中间的派生类,orch直接使用的是class Consumer和class ExecutableTimer。
消费者类一般用于处理app_db的订阅事件,对于asic_db一般是处理syncd的应答事件。
- typedef std::pair<std::string, std::string> FieldValueTuple;
- #define fvField std::get<0>
- #define fvValue std::get<1>
- typedef std::tuple<std::string, std::string, std::vector<FieldValueTuple> > KeyOpFieldsValuesTuple;
- #define kfvKey std::get<0>
- #define kfvOp std::get<1>
- #define kfvFieldsValues std::get<2>
- typedef map<string, KeyOpFieldsValuesTuple> SyncMap;
-
- class Consumer : public Executor {
- public:
- Consumer(TableConsumable *select, Orch *orch)
- : Executor(select, orch)
- {
- }
-
- TableConsumable *getConsumerTable() const
- {
- return static_cast<TableConsumable *>(getSelectable());
- }
-
- string getTableName() const
- {
- return getConsumerTable()->getTableName();
- }
- // 事物执行
- void execute();
- void drain();
-
- /* Store the latest 'golden' status */
- // TODO: hide?
- SyncMap m_toSync;
- };
epoll事件触发后,需要调用该函数从数据库中读取出指定key的内容,将其加工后存放在m_toSync中,供后续处理。
- void Consumer::execute()
- {
- SWSS_LOG_ENTER();
-
- std::deque<KeyOpFieldsValuesTuple> entries;
- //调用pops函数,从redis数据库中读取数据,返回KeyOpFieldsValuesTuple结构
- getConsumerTable()->pops(entries);
-
- /* Nothing popped */
- if (entries.empty())
- {
- return;
- }
- // 遍历每一个事件
- for (auto& entry: entries)
- {
- string key = kfvKey(entry);
- string op = kfvOp(entry);
-
- /* Record incoming tasks 记录事件 */
- if (gSwssRecord)
- {
- Orch::recordTuple(*this, entry);
- }
-
- /* If a new task comes or if a DEL task comes, we directly put it into getConsumerTable().m_toSync map */
- // 在这里进行一次合并,对于删除事件,直接覆盖
- if (m_toSync.find(key) == m_toSync.end() || op == DEL_COMMAND)
- {
- m_toSync[key] = entry;
- }
- /* If an old task is still there, we combine the old task with new task */
- /* */
- else
- {
- KeyOpFieldsValuesTuple existing_data = m_toSync[key];
-
- auto new_values = kfvFieldsValues(entry);
- auto existing_values = kfvFieldsValues(existing_data);
-
- //遍历每一个新的值
- for (auto it : new_values)
- {
- string field = fvField(it);
- string value = fvValue(it);
-
- auto iu = existing_values.begin();
- while (iu != existing_values.end())//遍历每一个旧的值
- {
- string ofield = fvField(*iu);
- if (field == ofield)//相同的域,将老的值覆盖,这里应该跳出while,代码效率较差
- iu = existing_values.erase(iu);
- else
- iu++;
- }
- /* 将新的值添加进去 */
- existing_values.push_back(FieldValueTuple(field, value));
- }
- m_toSync[key] = KeyOpFieldsValuesTuple(key, op, existing_values);
- }
- }
- //执行所有整理好的任务。
- drain();
- }
假设有一个task的键值对如下:
- key=test;op=set;value={
- A:a,
- B:b,
- C:c,
- }
第一次触发任务是在APP_DB中写入了:
- key=test;op=set;value={
- A:a,
- B:b
- }
加入orchagent只是将该任务读取到了m_toSync中,由于某种原因没有执行完该任务,依然驻留在m_toSync中。第二次写入了:
- key=test;op=set;value={
- B:b1,
- C:c
- }
那么经过execute函数后m_toSync中将会是:
- key=test;op=set;value={
- A:a,
- B:b1,
- C:c
- }
执行m_toSync中的任务。
- void Consumer::drain()
- {
- if (!m_toSync.empty())
- m_orch->doTask(*this);
- }
- class Orch
- {
- public:
- //每个orch都会连接到数据库,以及其需要订阅的表名,和订阅该表产生的事件的优先级
-
- //以默认优先级订阅一个table
- Orch(DBConnector *db, const string tableName, int pri = default_orch_pri);
- //以默认优先级订阅多个table
- Orch(DBConnector *db, const vector<string> &tableNames);
- //订阅多个table,指明每个table的优先级
- Orch(DBConnector *db, const vector<table_name_with_pri_t> &tableNameWithPri);
- //连接多个数据库
- Orch(const vector<TableConnector>& tables);
- virtual ~Orch();
- // 获取该orch的所有epoll事件
- vector<Selectable*> getSelectables();
-
- /* Iterate all consumers in m_consumerMap and run doTask(Consumer) */
- // 执行该orch中所有的consumers中的m_sync中的任务
- void doTask();
-
- /* Run doTask against a specific executor */
- // 任务的来源可以是consumer,NotificationConsumer,SelectableTimer
- virtual void doTask(Consumer &consumer) = 0;
- virtual void doTask(NotificationConsumer &consumer) { }
- virtual void doTask(SelectableTimer &timer) { }
-
- /* TODO: refactor recording */
- static void recordTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple);
- protected:
- // 消费者map,一个orch可以订阅多个table,key为tableName,value为Executor
- ConsumerMap m_consumerMap;
- // 与调试相关
- static void logfileReopen();
- string dumpTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple);
- ref_resolve_status resolveFieldRefValue(type_map&, const string&, KeyOpFieldsValuesTuple&, sai_object_id_t&);
- bool parseIndexRange(const string &input, sai_uint32_t &range_low, sai_uint32_t &range_high);
- bool parseReference(type_map &type_maps, string &ref, string &table_name, string &object_name);
- ref_resolve_status resolveFieldRefArray(type_map&, const string&, KeyOpFieldsValuesTuple&, vector<sai_object_id_t>&);
-
- /* Note: consumer will be owned by this class */
- // 内部函数添加一个Executor,给addConsumer使用
- void addExecutor(string executorName, Executor* executor);
- Executor *getExecutor(string executorName);
- private:
- // 添加一个消费者
- void addConsumer(DBConnector *db, string tableName, int pri = default_orch_pri);
- };
- void Orch::addExecutor(string executorName, Executor* executor)
- {
- m_consumerMap.emplace(std::piecewise_construct,
- std::forward_as_tuple(executorName),
- std::forward_as_tuple(executor));
- }
- //添加一个消费者
- void Orch::addConsumer(DBConnector *db, string tableName, int pri)
- {
- if (db->getDbId() == CONFIG_DB || db->getDbId() == STATE_DB)
- {
- addExecutor(tableName, new Consumer(new SubscriberStateTable(db, tableName, TableConsumable::DEFAULT_POP_BATCH_SIZE, pri), this));
- }
- else
- {
- addExecutor(tableName, new Consumer(new ConsumerStateTable(db, tableName, gBatchSize, pri), this));
- }
- }
执行本orch中的每一个消费者m_toSync中的task,不管该task是否本次从redis中读取还是以前未处理完毕的。
- void Orch::doTask()
- {
- for(auto &it : m_consumerMap)
- {
- it.second->drain();
- }
- }
orch2是在orch的基础上的一个封装,代码的可读性增强。
- class Orch2 : public Orch
- {
- public:
- Orch2(DBConnector *db, const std::string& tableName, Request& request, int pri=default_orch_pri)
- : Orch(db, tableName, pri), request_(request)
- {
- }
-
- protected:
- virtual void doTask(Consumer& consumer);
-
- virtual bool addOperation(const Request& request)=0;
- virtual bool delOperation(const Request& request)=0;
-
- private:
- Request& request_;
- };
- void Orch2::doTask(Consumer &consumer)
- {
- SWSS_LOG_ENTER();
-
- auto it = consumer.m_toSync.begin();
- while (it != consumer.m_toSync.end())
- {
- bool erase_from_queue = true;
- try
- {
- request_.parse(it->second);
-
- auto op = request_.getOperation();
- if (op == SET_COMMAND)
- {
- erase_from_queue = addOperation(request_);
- }
- else if (op == DEL_COMMAND)
- {
- erase_from_queue = delOperation(request_);
- }
- else
- {
- SWSS_LOG_ERROR("Wrong operation. Check RequestParser: %s", op.c_str());
- }
- }
- catch (const std::invalid_argument& e)
- {
- SWSS_LOG_ERROR("Parse error: %s", e.what());
- }
- catch (const std::logic_error& e)
- {
- SWSS_LOG_ERROR("Logic error: %s", e.what());
- }
- catch (const std::exception& e)
- {
- SWSS_LOG_ERROR("Exception was catched in the request parser: %s", e.what());
- }
- catch (...)
- {
- SWSS_LOG_ERROR("Unknown exception was catched in the request parser");
- }
- request_.clear();
- //执行成功,那么从m_tosync中删除,否则执行下一个task
- if (erase_from_queue)
- {
- it = consumer.m_toSync.erase(it);
- }
- else
- {
- ++it;
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。