当前位置:   article > 正文

OceanBase 配置项&系统变量实现及应用详解(3):新增配置项的方法

OceanBase 配置项&系统变量实现及应用详解(3):新增配置项的方法

本专题的第一篇文章,配置项的定义及使用方法,详细阐述了配置项的基础用法。对于那些对源码抱有浓厚兴趣的同学来说,或许还希望深入了解配置项的实现原理,甚至渴望亲自添加新的配置项,以满足个性化的功能需求。

本文通过剖析“如何新增配置项”这一话题,并结合配置项源码的具体实现,来详细讲解配置项的定义、初始化、内部访问及同步机制。

如何新增配置项?

要新增一个配置项,首先在 src/share/parameter/ob_parameter_seed.ipp 文件中,按照下面的格式定义配置项。

  1. // 定义一个集群级别的、INT类型的配置项,动态生效
  2. DEF_INT(cpu_count, OB_CLUSTER_PARAMETER, "0", "[0,]",
  3. "the number of CPU\\'s in the system. "
  4. "If this parameter is set to zero, the number will be set according to sysconf; "
  5. "otherwise, this parameter is used. Range: [0,+∞) in integer",
  6. ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
  7. // 定义一个租户级别的、TIME类型的配置项,动态生效,并附带一个合法性检查类 ObConfigStaleTimeChecker
  8. DEF_TIME_WITH_CHECKER(max_stale_time_for_weak_consistency, OB_TENANT_PARAMETER, "5s",
  9. common::ObConfigStaleTimeChecker,
  10. "[5s,)",
  11. "the max data stale time that cluster weak read version behind current timestamp,"
  12. "no smaller than weak_read_version_refresh_interval, range: [5s, +∞)",
  13. ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));

其中每个参数的含义如下:

参数说明举例
配置项宏定义配置的宏,可以选择附带合法性检查函数DEF_XXX、DEF_XXX_WITH_CHECKER
配置项名配置项的名字一般格式:xxx_xxx_xxx
生效范围租户配置项 or 集群配置项OB_CLUSTER_PARAMETER、OB_TENANT_PARAMETER
默认值默认值必须在取值范围中"0", "True", "100ms", "20GB", "random"
取值范围布尔类型和字符串类型不需要取值范围"[0,]", "[0M,]", "[1s, 180s]"
描述介绍该配置项的功能和注意事项"enable xxx"
配置项属性① Section:所属模块;
② Source:来源;
③ EditLevel:生效方式;
① OBSERVER、TENANT、TRANS
② DEFAULT
③ DYNAMIC_EFFECTIVE、STATIC_EFFECTIVE

DEF_XXX 宏最终展开是一个类的声明,并同时定义一个对象,对象名就是配置项名。

比如 DEF_TIME_WITH_CHECKER 宏,最终展开是一个 ObConfigTimeItem 类,重载 operator= 操作符让该类可以像基础数据类型一样进行赋值操作。

  1. #define _DEF_PARAMETER_CHECKER_EASY(access_specifier, param, scope, name, def, checker, args...) \
  2. access_specifier: \
  3. class ObConfig ## param ## Item ## _ ## name \
  4. : public common::ObConfig ## param ## Item \
  5. { \
  6. public: \
  7. ObConfig ## param ## Item ## _ ## name() \
  8. : common::ObConfig ## param ## Item( \
  9. local_container(), scope, #name, def, args) \
  10. { \
  11. add_checker(OB_NEW(checker, g_config_mem_attr)); \
  12. } \
  13. template <class T> \
  14. ObConfig ## param ## Item ## _ ## name& operator=(T value) \
  15. { \
  16. common::ObConfig ## param ## Item::operator=(value); \
  17. return *this; \
  18. } \
  19. } name;

ObConfigTimeItemXxx 继承的是一个 ObConfigIntegralItem 类,所以 time 类型的数据本质上是一个 int64_t 变量。

  1. class ObConfigTimeItem
  2. : public ObConfigIntegralItem
  3. {
  4. public:
  5. ......
  6. static const uint64_t VALUE_BUF_SIZE = 32UL;
  7. char value_str_[VALUE_BUF_SIZE];
  8. char value_reboot_str_[VALUE_BUF_SIZE];
  9. };

ObConfigIntegralItem 类重载了操作符 operator const int64_t &() const,在代码中访问该类对象的对象名时,实际上返回的是对象的 value_值。

  1. class ObConfigIntegralItem
  2. : public ObConfigItem
  3. {
  4. // get_value() return the real-time value
  5. int64_t get_value() const { return value_; }
  6. // get() return the real-time value if it does not need reboot, otherwise it return initial_value
  7. int64_t get() const { return value_; }
  8. operator const int64_t &() const { return value_; }
  9. private:
  10. int64_t value_;

配置项定义完成后,重新编译部署 OBServer,就可以对这个配置项进行查询和修改了。不过现在这个配置项没有任何功能,还需要在代码中用起来才能生效。

配置项初始化

集群配置项 ObServerConfig

集群配置项在 OBServer 启动时进行初始化,首先会从配置项的定义中获取默认值,然后从持久化配置文件中获取历史值(非首次启动),最后从 OBServer 的执行参数中获取最新值。它们的优先级是:执行参数 > 持久化配置文件 > 默认值。

配置项对象构造

集群配置项定义在 ObServerConfig 类中,通过在 ObConfigManager 类中定义了一个 ObServerConfig 实例,当 OBServer 启动时,它们及其成员变量的构造函数就会被调用。

  1. class ObServerConfig : public ObCommonConfig
  2. {
  3. public:
  4. friend class ObServerMemoryConfig;
  5. int init(const ObSystemConfig &config);
  6. static ObServerConfig &get_instance();
  7. #undef OB_CLUSTER_PARAMETER
  8. #define OB_CLUSTER_PARAMETER(args...) args
  9. #include "share/parameter/ob_parameter_seed.ipp"
  10. #undef OB_CLUSTER_PARAMETER
  11. }

每个配置项都有默认值,在配置项的构造函数中会将 value_ 置为默认值。

以下是相关函数的调用流程,以及关键函数的代码解析。有的函数只是省略了参数列表,不代表没有参数。

  • ObConfigTimeItem::ObConfigTimeItem()
  • ObConfigIntegralItem::init()
  • ObConfigItem::init()
  • ObConfigItem::set_value(const char *str)
  • ObConfigIntegralItem::set(const char *str)

其中 str 就是一开始定义的配置项默认值,将 str 中的字符串转化为对应类型的数据,然后赋值给 value_。

  1. inline bool ObConfigIntegralItem::set(const char *str)
  2. {
  3. bool valid = true;
  4. const int64_t value = parse(str, valid);
  5. if (valid) {
  6. value_ = value;
  7. }
  8. return valid;
  9. }

配置文件解析

当集群配置项都构造完成后,再从持久化的配置文件中 etc/observer.config.bin 加载配置项。(在 OBServer 初次启动时,配置文件为空,所以不会执行这一步)

  • ObServer::init()
  • ObServer::init_config()
  • ObConfigManager::load_config(const char *path)
  • ObServerConfig::deserialize_with_compat()
  • OB_DEF_DESERIALIZE(ObServerConfig)

先调用 ObCommonConfig::deserialize() 函数解析集群配置项,再调用 OTC_MGR.deserialize() 函数解析租户配置项。

  1. OB_DEF_DESERIALIZE(ObServerConfig)
  2. {
  3. } else if (OB_FAIL(ObCommonConfig::deserialize(buf, data_len, pos))) {
  4. LOG_ERROR("deserialize cluster config failed", K(ret));
  5. } else if (OB_FAIL(OTC_MGR.deserialize(buf, data_len, pos))){
  6. LOG_ERROR("deserialize tenant config failed", K(ret));
  7. }
  • OB_DEF_DESERIALIZE(ObCommonConfig)
  • ObCommonConfig::add_extra_config()

将文件中集群配置项的值加载到内存数据结构中。

  1. int ObCommonConfig::add_extra_config(const char *config_str,
  2. int64_t version /* = 0 */ ,
  3. bool check_name /* = false */,
  4. bool check_unit /* = true */)
  5. {
  6. ......
  7. while (OB_SUCC(ret) && OB_NOT_NULL(token)) {
  8. if (strncmp(token, "enable_production_mode=", 23) == 0) {
  9. ......
  10. } else if (OB_ISNULL(pp_item = container_.get(ObConfigStringKey(name)))) {
  11. ......
  12. if (OB_FAIL(ret) || OB_ISNULL(pp_item)) {
  13. } else if (check_unit && !(*pp_item)->check_unit(value)) {
  14. ret = OB_INVALID_CONFIG;
  15. LOG_ERROR("Invalid config value", K(name), K(value), K(ret));
  16. } else if (!(*pp_item)->set_value(value)) {
  17. ret = OB_INVALID_CONFIG;
  18. LOG_ERROR("Invalid config value", K(name), K(value), K(ret));
  19. } else if (!(*pp_item)->check()) {
  20. ret = OB_INVALID_CONFIG;
  21. const char* range = (*pp_item)->range();
  22. if (OB_ISNULL(range) || strlen(range) == 0) {
  23. LOG_ERROR("Invalid config, value out of range", K(name), K(value), K(ret));
  24. } else {
  25. _LOG_ERROR("Invalid config, value out of %s (for reference only). name=%s, value=%s, ret=%d", range, name, value, ret);
  26. }
  27. } else {
  28. (*pp_item)->set_version(version);
  29. LOG_INFO("Load config succ", K(name), K(value));
  30. }

执行参数解析

当配置文件中的数据加载完成后,再解析 OBServer 执行命令中的配置项参数。

  • ObServer::init_config()

其中 config_.add_extra_config(opts_.optstr_, start_time_) 函数将参数解析为配置项的值,然后加载到配置项结构中。最后调用 dump2file() 函数,将前面解析出来的配置项全部持久化到"etc/observer.config.bin"文件中。

  1. int ObServer::init_config()
  2. {
  3. int ret = OB_SUCCESS;
  4. bool has_config_file = true;
  5. // set dump path
  6. const char *dump_path = "etc/observer.config.bin";
  7. config_mgr_.set_dump_path(dump_path);
  8. if (OB_FILE_NOT_EXIST == (ret = config_mgr_.load_config())) {
  9. has_config_file = false;
  10. ret = OB_SUCCESS;
  11. }
  12. ......
  13. if (opts_.optstr_ && strlen(opts_.optstr_) > 0) {
  14. if (OB_FAIL(config_.add_extra_config(opts_.optstr_, start_time_))) {
  15. LOG_ERROR("invalid config from cmdline options", K(opts_.optstr_), KR(ret));
  16. }
  17. }
  18. ......
  19. if (is_arbitration_mode()) {
  20. // arbitration mode, dump config params to file directly
  21. if (OB_FAIL(config_mgr_.dump2file())) {
  22. LOG_ERROR("config_mgr_ dump2file failed", KR(ret));
  23. } else {
  24. LOG_INFO("config_mgr_ dump2file success", KR(ret));
  25. }

如果是用 OBD 部署集群,OBServer 第一次启动时,命令中会带上"xxx.yaml"文件中的启动参数。例如:

/data/user/observer1/bin/observer -p 23400 -P 23401 -z zone1 -c 1 -d /data/user/observer1/store -i lo -r 127.0.0.1:23401:23400 -o __min_full_resource_pool_memory=268435456,major_freeze_duty_time=Disable,datafile_size=20G,memory_limit=10G,system_memory=5G,cpu_count=24,stack_size=512K,cache_wash_threshold=1G,workers_per_cpu_quota=10,schema_history_expire_time=1d,net_thread_count=4,minor_freeze_times=10,enable_separate_sys_clog=False,enable_merge_by_turn=False,syslog_io_bandwidth_limit=10G,enable_async_syslog=False

集群第一次部署完成之后,"etc/observer.config.bin"文件才会被创建,之后重启 OBServer 就可以不带参数了,进程会从该文件中读取修改后的配置项。

租户配置项 ObTenantConfig

创建租户时初始化

租户级别配置项首先在创建租户时进行初始化,在代码中是一个 ObTenantConfig 对象。

(不只这一条调用路径,但最终会调用 add_tenant_config() 函数)

  • ObRpcNotifyTenantServerUnitResourceP::process()
  • ObTenantNodeBalancer::notify_create_tenant(oceanbase::obrpc::TenantServerUnitConfig const&)
  • ObTenantNodeBalancer::check_new_tenant(oceanbase::share::ObUnitInfoGetter::ObTenantConfig const&, long)
  • ObMultiTenant::create_tenant(oceanbase::omt::ObTenantMeta const&, bool, long)
  • ObTenantConfigMgr::add_tenant_config(uint64_t tenant_id)

申请一个新的 ObTenantConfig 对象,调用 init() 初始化,然后添加到 config_map_中。

  1. int ObTenantConfigMgr::add_tenant_config(uint64_t tenant_id)
  2. {
  3. int ret = OB_SUCCESS;
  4. ObTenantConfig *const *config = nullptr;
  5. DRWLock::WRLockGuard guard(rwlock_);
  6. if (is_virtual_tenant_id(tenant_id)
  7. || OB_NOT_NULL(config = config_map_.get(ObTenantID(tenant_id)))) {
  8. if (nullptr != config) {
  9. ObTenantConfig *new_config = *config;
  10. new_config->set_deleting(false);
  11. }
  12. } else {
  13. ObTenantConfig *new_config = nullptr;
  14. new_config = OB_NEW(ObTenantConfig, SET_USE_UNEXPECTED_500("TenantConfig"), tenant_id);
  15. if (OB_NOT_NULL(new_config)) {
  16. if(OB_FAIL(new_config->init(this))) {
  17. LOG_WARN("new tenant config init failed", K(ret));
  18. } else if (OB_FAIL(config_map_.set_refactored(ObTenantID(tenant_id),
  19. new_config, 0))) {
  20. LOG_WARN("add new tenant config failed", K(ret));
  21. }
  22. ......

因为 ObTenantConfig 引入了租户配置项的定义,因此在构造 ObTenantConfig 对象时就完成了配置项的构造。

  1. class ObTenantConfig : public ObCommonConfig
  2. {
  3. #undef OB_TENANT_PARAMETER
  4. #define OB_TENANT_PARAMETER(args...) args
  5. #include "share/parameter/ob_parameter_seed.ipp"
  6. #undef OB_TENANT_PARAMETER
  7. };

重启时初始化

在 OBServer 重新启动时,已有租户的配置项也会进行初始化。

  • OB_DEF_DESERIALIZE(ObServerConfig)

调用 OTC_MGR.deserialize() 函数。

  1. OB_DEF_DESERIALIZE(ObServerConfig)
  2. {
  3. } else if (OB_FAIL(ObCommonConfig::deserialize(buf, data_len, pos))) {
  4. LOG_ERROR("deserialize cluster config failed", K(ret));
  5. } else if (OB_FAIL(OTC_MGR.deserialize(buf, data_len, pos))){
  6. LOG_ERROR("deserialize tenant config failed", K(ret));
  7. }
  • OB_DEF_DESERIALIZE(ObTenantConfigMgr)

读取文件中租户配置项,根据 tenant_id 获取 config,然后将buf中的配置项解析到 config 中。

  1. OB_DEF_DESERIALIZE(ObTenantConfigMgr)
  2. {
  3. int ret = OB_SUCCESS;
  4. if (data_len == 0 || pos >= data_len) {
  5. } else {
  6. while(OB_SUCC(ret) && pos < data_len) {
  7. ......
  8. ObTenantConfig *config = nullptr;
  9. if (OB_FAIL(config_map_.get_refactored(ObTenantID(tenant_id), config))) {
  10. if (ret != OB_HASH_NOT_EXIST || OB_FAIL(add_tenant_config(tenant_id))) {
  11. LOG_ERROR("get tenant config failed", K(tenant_id), K(ret));
  12. break;
  13. }
  14. ret = config_map_.get_refactored(ObTenantID(tenant_id), config);
  15. }
  16. if (OB_SUCC(ret)) {
  17. pos = saved_pos;
  18. ret = config->deserialize(buf, data_len, pos);
  19. }
  20. ......

在代码中查询配置项

外部查询

当用户使用"show ..."类型的命令时,OBServer 内部有统一的处理函数:ObShowResolver::resolve()。查询配置项的命令会被解析为 T_SHOW_PARAMETERS 类型,通过查询 __all_virtual_tenant_parameter_stat 表获取配置项的值。

  1. int ObShowResolver::resolve(const ParseNode &parse_tree)
  2. {
  3. ......
  4. case T_SHOW_PARAMETERS: {
  5. ......
  6. if (params_.show_seed_) {
  7. char local_ip[OB_MAX_SERVER_ADDR_SIZE] = "";
  8. if (OB_UNLIKELY(true != GCONF.self_addr_.ip_to_string(local_ip, sizeof(local_ip)))) {
  9. ret = OB_CONVERT_ERROR;
  10. } else {
  11. GEN_SQL_STEP_1(ObShowSqlSet::SHOW_PARAMETERS_SEED);
  12. GEN_SQL_STEP_2(ObShowSqlSet::SHOW_PARAMETERS_SEED, REAL_NAME(OB_SYS_DATABASE_NAME, OB_ORA_SYS_SCHEMA_NAME), REAL_NAME(OB_ALL_VIRTUAL_TENANT_PARAMETER_STAT_TNAME, OB_ALL_VIRTUAL_TENANT_PARAMETER_STAT_ORA_TNAME),
  13. local_ip, GCONF.self_addr_.get_port());
  14. }
  15. } else if (OB_SYS_TENANT_ID == show_tenant_id) {
  16. GEN_SQL_STEP_1(ObShowSqlSet::SHOW_PARAMETERS);
  17. GEN_SQL_STEP_2(ObShowSqlSet::SHOW_PARAMETERS,
  18. REAL_NAME(OB_SYS_DATABASE_NAME, OB_ORA_SYS_SCHEMA_NAME),
  19. REAL_NAME(OB_ALL_VIRTUAL_TENANT_PARAMETER_STAT_TNAME, OB_ALL_VIRTUAL_TENANT_PARAMETER_STAT_ORA_TNAME),
  20. show_tenant_id);
  21. } else {
  22. GEN_SQL_STEP_1(ObShowSqlSet::SHOW_PARAMETERS);
  23. GEN_SQL_STEP_2(ObShowSqlSet::SHOW_PARAMETERS,
  24. REAL_NAME(OB_SYS_DATABASE_NAME, OB_ORA_SYS_SCHEMA_NAME),
  25. REAL_NAME(OB_ALL_VIRTUAL_TENANT_PARAMETER_STAT_TNAME, OB_ALL_VIRTUAL_TENANT_PARAMETER_STAT_ORA_TNAME),
  26. show_tenant_id);
  27. }
  28. }
  29. // 最终查询的虚拟表为 __all_virtual_tenant_parameter_stat
  30. const char *const OB_ALL_VIRTUAL_TENANT_PARAMETER_STAT_TNAME = "__all_virtual_tenant_parameter_stat";

查询该虚拟表需要先初始化一个 ObAllVirtualTenantParameterStat 对象,然后调用 inner_open() 函数,再循环调用 inner_get_next_row() 函数遍历虚拟表。

  • ObVirtualTableIterator::get_next_row(common::ObNewRow*&)
  • ObAllVirtualTenantParameterStat::inner_get_next_row(common::ObNewRow*&)

其中 inner_sys_get_next_row() 函数获取集群配置项,inner_tenant_get_next_row() 函数获取租户配置项,最后根据查询条件进行筛选,返回满足条件的配置项。

  1. int ObAllVirtualTenantParameterStat::inner_get_next_row(ObNewRow *&row)
  2. {
  3. int ret = OB_SUCCESS;
  4. if (OB_UNLIKELY(!inited_)) {
  5. ret = OB_NOT_INIT;
  6. SERVER_LOG(WARN, "not inited", K(inited_), KR(ret));
  7. } else if (show_seed_) {
  8. ret = inner_seed_get_next_row(row);
  9. } else {
  10. if (OB_SUCC(inner_sys_get_next_row(row))) {
  11. } else if (OB_ITER_END == ret) {
  12. ret = inner_tenant_get_next_row(row);
  13. }
  14. }
  15. return ret;
  16. }
  • ObAllVirtualTenantParameterStat::inner_sys_get_next_row(common::ObNewRow*&)

从 GCONF 中获取集群配置项。

  1. int ObAllVirtualTenantParameterStat::inner_sys_get_next_row(common::ObNewRow *&row)
  2. {
  3. /*cluster parameter does not belong to any tenant*/
  4. return fill_row_(row, sys_iter_, GCONF.get_container(), NULL);
  5. }
  6. // 集群配置项迭代器也就是 GCONF 中一个 hashmap 的迭代器
  7. sys_iter_ = GCONF.get_container().begin();
  • ObAllVirtualTenantParameterStat::inner_tenant_get_next_row(common::ObNewRow *&row)

从 tenant_config_中获取租户配置项。

  1. int ObAllVirtualTenantParameterStat::inner_tenant_get_next_row(common::ObNewRow *&row)
  2. {
  3. // 租户配置项迭代器也就是 tenant_config_ 中一个 hashmap 的迭代器
  4. if (cur_tenant_idx_ < 0 // first come-in
  5. // current tenant is over
  6. || (tenant_config_.is_valid() && tenant_iter_ == tenant_config_->get_container().end())) {
  7. // find next valid tenant
  8. while (OB_SUCC(ret) && ++cur_tenant_idx_ < tenant_id_list_.count()) {
  9. uint64_t tenant_id = tenant_id_list_.at(cur_tenant_idx_);
  10. tenant_config_.set_config(TENANT_CONF(tenant_id));
  11. if (tenant_config_.is_valid()) {
  12. tenant_iter_ = tenant_config_->get_container().begin();
  13. ......
  14. }
  15. ......
  16. } else {
  17. const uint64_t tenant_id = tenant_id_list_.at(cur_tenant_idx_);
  18. if (OB_FAIL(fill_row_(row,
  19. tenant_iter_,
  20. tenant_config_->get_container(),
  21. &tenant_id))) {
  22. SERVER_LOG(WARN, "fill row fail", KR(ret), K(tenant_id), K(tenant_config_->get_tenant_id()),
  23. K(cur_tenant_idx_), K(tenant_id_list_));
  24. }

内部获取

集群配置项

因为配置项重载了操作符 operator &(),所以集群配置项直接通过 GCONF.xxx 的形式访问即可。

  1. #include "share/config/ob_server_config.h"
  2. GCONF.enable_sql_audit

在代码中有需要的地方,可以用“if (GCONF.enable_xxx)”来控制分支的走向,或者用“GCONF.xxx_time”来进行时间的计算,这样就可以把配置项使用起来了。

租户配置项

访问租户配置项需要先调用 OTC_MGR.read_tenant_config() 函数获取租户的 config,然后从 config 中获取指定的配置项。以租户配置项 max_stale_time_for_weak_consistency 为例,为其封装一个取值函数。

  • ObWeakReadUtil::max_stale_time_for_weak_consistency(const uint64_t tenant_id, int64_t ignore_warn)

如果获取租户 config 成功,则从 config 中获取配置项的值,否则返回配置项的默认值并打印日志。

  1. int64_t ObWeakReadUtil::max_stale_time_for_weak_consistency(const uint64_t tenant_id, int64_t ignore_warn)
  2. {
  3. int64_t max_stale_time = 0;
  4. OTC_MGR.read_tenant_config(
  5. tenant_id,
  6. oceanbase::omt::ObTenantConfigMgr::default_fallback_tenant_id(),
  7. /* success */ [&max_stale_time](const omt::ObTenantConfig &config) mutable {
  8. max_stale_time = config.max_stale_time_for_weak_consistency;
  9. },
  10. /* failure */ [tenant_id, ignore_warn, &max_stale_time]() mutable {
  11. max_stale_time = DEFAULT_MAX_STALE_TIME_FOR_WEAK_CONSISTENCY;
  12. if (IGNORE_TENANT_EXIST_WARN != ignore_warn && REACH_TIME_INTERVAL(1 * 1000 * 1000L)) {
  13. TRANS_LOG_RET(WARN, OB_ERR_UNEXPECTED, "tenant not exist when get max stale time for weak consistency,"
  14. " use default max stale time instead",
  15. K(tenant_id), K(max_stale_time), K(lbt()));
  16. }
  17. }
  18. );
  19. return max_stale_time;
  20. }
  • ObTenantConfigMgr::read_tenant_config()

从 config_map_中获取 tenant_id 对应的config,成功则调用 SuccessFunctor,失败则调用 FailureFunctor。

  1. int ObTenantConfigMgr::read_tenant_config(
  2. const uint64_t tenant_id,
  3. const uint64_t fallback_tenant_id,
  4. const SuccessFunctor &on_success,
  5. const FailureFunctor &on_failure) const
  6. {
  7. int ret = OB_SUCCESS;
  8. ObTenantConfig *config = nullptr;
  9. DRWLock::RDLockGuard guard(rwlock_);
  10. if (OB_FAIL(config_map_.get_refactored(ObTenantID(tenant_id), config))) {
  11. if (fallback_tenant_id > 0 && OB_INVALID_ID != fallback_tenant_id) {
  12. if (OB_FAIL(config_map_.get_refactored(ObTenantID(fallback_tenant_id), config))) {
  13. LOG_WARN("failed to get tenant config", K(fallback_tenant_id), K(ret), K(lbt()));
  14. }
  15. } else {
  16. LOG_WARN("failed to get tenant config", K(tenant_id), K(ret));
  17. }
  18. }
  19. if (OB_SUCC(ret) && OB_NOT_NULL(config)) {
  20. on_success(*config);
  21. } else {
  22. on_failure();
  23. LOG_WARN("fail read tenant config", K(tenant_id), K(ret));
  24. }
  25. return ret;
  26. }

在代码中修改配置项

外部修改

修改集群配置项

系统租户执行修改集群配置项命令时,内部会向 __all_sys_parameter 表中插入一条记录(该表只记录增量数据),实际执行的是以下sql命令。

  1. select config_version, zone, svr_type, svr_ip, svr_port, name, data_type, value, info, section, scope, source, edit_level from __all_sys_parameter
  2. INSERT INTO __all_sys_parameter (zone, svr_type, svr_ip, svr_port, name, data_type, value, info, config_version, gmt_modified, section, scope, source, edit_level) VALUES ('', 'observer', 'ANY', 0, 'cpu_count', 'varchar', '9', '', 1689734424757370, usec_to_time(1689734424757370), 'OBSERVER', 'CLUSTER', 'DEFAULT', 'DYNAMIC_EFFECTIVE') ON DUPLICATE KEY UPDATE data_type = 'varchar', value = '9', info = '', config_version = 1689734424757370, gmt_modified = usec_to_time(1689734424757370), section = 'OBSERVER', scope = 'CLUSTER', source = 'DEFAULT', edit_level = 'DYNAMIC_EFFECTIVE'
  3. select config_version, zone, svr_type, svr_ip, svr_port, name, data_type, value, info, section, scope, source, edit_level from __all_sys_parameter
  4. select config_version, zone, svr_type, svr_ip, svr_port, name, data_type, value, info, section, scope, source, edit_level from __all_sys_parameter

将修改后的值插入内部表之后,修改配置项命令就执行完成了。之后内部会将表中的增量数据刷新到各节点的本地数据结构中,此时才算真正完成了集群配置项更新。

修改租户配置项

执行租户配置项修改命令后,内部会执行两条sql,先往 __tenant_parameter 内部表中插入一行数据(该表只记录增量数据),然后往 __all_rootservice_event_history 内部表中插入一条修改记录。

  1. INSERT INTO __tenant_parameter (tenant_id, zone, svr_type, svr_ip, svr_port, name, data_type, value, info, config_version, gmt_modified, section, scope, source, edit_level) VALUES (1004, '', 'observer', 'ANY', 0, 'max_stale_time_for_weak_consistency', 'varchar', '7s', '', 1689732907024711, usec_to_time(1689732907024711), 'TENANT', 'TENANT', 'DEFAULT', 'DYNAMIC_EFFECTIVE') ON DUPLICATE KEY UPDATE data_type = 'varchar', value = '7s', info = '', config_version = 1689732907024711, gmt_modified = usec_to_time(1689732907024711), section = 'TENANT', scope = 'TENANT', source = 'DEFAULT', edit_level = 'DYNAMIC_EFFECTIVE'
  2. INSERT INTO __all_rootservice_event_history (gmt_create, module, event, name1, value1, name2, value2, rs_svr_ip, rs_svr_port) VALUES (usec_to_time(1689732907031822), 'root_service', 'admin_set_config', 'ret', 0, 'arg', '{items:[{name:"max_stale_time_for_weak_consistency", value:"7s", comment:"", zone:"", server:"0.0.0.0:0", tenant_name:"", exec_tenant_id:1004, tenant_ids:[1004]}], is_inner:false}', '127.0.0.1', 23401

同样的,修改内部表之后SQL命令就会返回,之后再由后台线程刷新各节点的租户配置项。

内部更新(同步机制

内部主动更新配置项,就是把 __tenant_parameter 和 __all_sys_parameter 内部表中的增量配置项同步到本地的过程。

集群配置项同步
  • ObLeaseStateMgr::start_heartbeat()

后台任务 hb_ 每2s执行一次。

  1. int ObLeaseStateMgr::start_heartbeat()
  2. {
  3. int ret = OB_SUCCESS;
  4. if (!inited_) {
  5. ret = OB_NOT_INIT;
  6. LOG_WARN("not init", K(ret));
  7. } else {
  8. const bool repeat = false;
  9. if (OB_FAIL(hb_timer_.schedule(hb_, DELAY_TIME, repeat))) {
  10. LOG_WARN("schedule failed", LITERAL_K(DELAY_TIME), K(repeat), K(ret));
  11. }
  12. }
  13. return ret;
  14. }
  15. static const int64_t DELAY_TIME = 2 * 1000 * 1000;//2s
  • ObLeaseStateMgr::HeartBeat::runTimerTask()

......

  • ObHeartBeatProcess::do_heartbeat_event(oceanbase::share::ObLeaseResponse const&)
  • ObHeartBeatProcess::ObZoneLeaseInfoUpdateTask::runTimerTask()

......

  • ObConfigManager::got_version(long, bool)
  • ObConfigManager::UpdateTask::runTimerTask()

该函数是刷新配置项的后台定时任务,会调用 update_local() 函数,将内部表中的数据同步到本地配置项中。

  1. void ObConfigManager::UpdateTask::runTimerTask()
  2. {
  3. ......
  4. } else if (update_local_) {
  5. config_mgr_->current_version_ = version;
  6. if (OB_FAIL(config_mgr_->system_config_.clear())) {
  7. // just print log, ignore ret
  8. LOG_WARN("Clear system config map failed", K(ret));
  9. } else {
  10. // do nothing
  11. }
  12. if (OB_FAIL(config_mgr_->update_local(version))) {
  13. LOG_WARN("Update local config failed", K(ret));
  14. // recovery current_version_
  15. config_mgr_->current_version_ = old_current_version;
  16. // retry update local config in 1s later
  17. if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::CONFIG_MGR, *this, 1000 * 1000L, false))) {
  18. LOG_WARN("Reschedule update local config failed", K(ret));
  19. }
  • ObConfigManager::update_local(int64_t expected_version)

该函数主要做了以下操作:

  1. sql_client_retry_weak.read():从 __all_sys_parameter 内部表中读取增量配置项;
  2. system_config_.update():将配置项的新值更新到本地;
  3. reload_config():重新加载和校验配置项;
  4. dump2file():将配置项同步到 observer.config.bin 文件中;
  1. int ObConfigManager::update_local(int64_t expected_version)
  2. {
  3. int ret = OB_SUCCESS;
  4. if (OB_ISNULL(sql_proxy_)) {
  5. ret = OB_NOT_INIT;
  6. LOG_WARN("sql proxy is null", K(ret));
  7. } else {
  8. ObSQLClientRetryWeak sql_client_retry_weak(sql_proxy_);
  9. SMART_VAR(ObMySQLProxy::MySQLResult, result) {
  10. int64_t start = ObTimeUtility::current_time();
  11. const char *sqlstr = "select config_version, zone, svr_type, svr_ip, svr_port, name, "
  12. "data_type, value, info, section, scope, source, edit_level "
  13. "from __all_sys_parameter";
  14. if (OB_FAIL(sql_client_retry_weak.read(result, sqlstr))) {
  15. LOG_WARN("read config from __all_sys_parameter failed", K(sqlstr), K(ret));
  16. } else if (OB_FAIL(system_config_.update(result))) {
  17. LOG_WARN("failed to load system config", K(ret));
  18. ......
  19. if (OB_SUCC(ret)) {
  20. if ('\0' == dump_path_[0]) {
  21. ret = OB_NOT_INIT;
  22. LOG_ERROR("Dump path doesn't set, stop read config", K(ret));
  23. } else if (OB_FAIL(server_config_.read_config())) {
  24. LOG_ERROR("Read server config failed", K(ret));
  25. } else if (OB_FAIL(reload_config())) {
  26. LOG_WARN("Reload configuration failed", K(ret));
  27. } else {
  28. DRWLock::RDLockGuard guard(OTC_MGR.rwlock_); // need protect tenant config because it will also serialize tenant config
  29. if (OB_FAIL(dump2file())) {
  30. LOG_WARN("Dump to file failed", K_(dump_path), K(ret));
  31. ......

租户配置项同步
  • ObLeaseStateMgr::HeartBeat::runTimerTask()

......

  • ObTenantConfig::got_version(long, bool)
  • ObTenantConfig::TenantConfigUpdateTask::runTimerTask()

租户配置项同样有一个后台定时任务,只要配置项的最新版本大于本地版本,就会触发更新操作。

  1. void ObTenantConfig::TenantConfigUpdateTask::runTimerTask()
  2. {
  3. int ret = OB_SUCCESS;
  4. if (OB_ISNULL(config_mgr_)) {
  5. ret = OB_NOT_INIT;
  6. LOG_WARN("invalid argument", K_(config_mgr), K(ret));
  7. } else if (OB_ISNULL(tenant_config_)){
  8. ret = OB_NOT_INIT;
  9. LOG_WARN("invalid argument", K_(tenant_config), K(ret));
  10. } else {
  11. const int64_t saved_current_version = tenant_config_->current_version_;
  12. const int64_t version = version_;
  13. THIS_WORKER.set_timeout_ts(INT64_MAX);
  14. if (tenant_config_->current_version_ >= version) {
  15. ret = OB_ALREADY_DONE;
  16. } else if (update_local_) {
  17. tenant_config_->current_version_ = version;
  18. if (OB_FAIL(tenant_config_->system_config_.clear())) {
  19. LOG_WARN("Clear system config map failed", K(ret));
  20. } else if (OB_FAIL(config_mgr_->update_local(tenant_config_->tenant_id_, version))) {
  21. LOG_WARN("ObTenantConfigMgr update_local failed", K(ret), K(tenant_config_));
  22. } else {
  23. config_mgr_->notify_tenant_config_changed(tenant_config_->tenant_id_);
  24. }
  • ObTenantConfigMgr::update_local(uint64_t tenant_id, int64_t expected_version)

查询 __tenant_parameter 内部表获取当前租户的 config,然后将新的数据更新到 config中。

  1. int ObTenantConfigMgr::update_local(uint64_t tenant_id, int64_t expected_version)
  2. {
  3. SMART_VAR(ObMySQLProxy::MySQLResult, result) {
  4. if (OB_FAIL(sql.assign_fmt(
  5. "select config_version, zone, svr_type, svr_ip, svr_port, name, "
  6. "data_type, value, info, section, scope, source, edit_level "
  7. "from %s where tenant_id = '%lu'", OB_TENANT_PARAMETER_TNAME, tenant_id))) {
  8. } else if (OB_FAIL(sql_client_retry_weak.read(result, exec_tenant_id, sql.ptr()))) {
  9. LOG_WARN("read config from __tenant_parameter failed",
  10. KR(ret), K(tenant_id), K(exec_tenant_id), K(sql));
  11. } else {
  12. DRWLock::WRLockGuard guard(rwlock_);
  13. ret = config_map_.get_refactored(ObTenantID(tenant_id), config);
  14. if (OB_FAIL(ret)) {
  15. LOG_ERROR("failed to get tenant config", K(tenant_id), K(ret));
  16. } else {
  17. ret = config->update_local(expected_version, result);
  18. }
  • ObTenantConfig::update_local()

调用 system_config_.update() 函数将配置项更新到本地,然后调用 dump2file() 将配置项持久化到文件中。

  1. int ObTenantConfig::update_local(int64_t expected_version, ObMySQLProxy::MySQLResult &result,
  2. bool save2file /* = true */)
  3. {
  4. int ret = OB_SUCCESS;
  5. if (OB_FAIL(system_config_.update(result))) {
  6. LOG_WARN("failed to load system config", K(ret));
  7. if (OB_SUCC(ret)) {
  8. if (OB_FAIL(read_config())) {
  9. LOG_ERROR("Read tenant config failed", K_(tenant_id), K(ret));
  10. } else if (save2file && OB_FAIL(config_mgr_->dump2file())) {
  11. LOG_WARN("Dump to file failed", K(ret));

小结

新增配置项并不复杂,代码中已经实现了成熟的访问和同步机制,只需要使用合适的宏并填上一些参数就可以定义新配置项了,而后续如何使用这个配置项才是实现新功能的关键。在修改配置项的过程中,实际上还会进行一些合法性检查,这部分会在后面的文章中与系统变量一起进行说明。

本专题下一篇文章是关于“系统变量的定义和源码解析”,系统变量的机制有别于配置项,还有全局级和会话级的区分,感兴趣的同学欢迎继续关注。

参考资料

  1. 配置项总览
  2. OceanBase源码
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/846060
推荐阅读
相关标签
  

闽ICP备14008679号