当前位置:   article > 正文

OceanBase 配置项&系统变量实现及应用详解(4):新增系统变量_oceanbase 增加全局变量

oceanbase 增加全局变量

本专题的前几篇文章已经详细阐述了OceanBase配置项和系统变量的基础用法,并对配置项的源码进行了剖析。但有一些同学可能还对系统变量的实现方式有兴趣,并希望能够像自定义配置项那样,新增一个系统变量。

本文将围绕“如何新增系统变量”这一话题展开,结合系统变量的源码实现,为大家详细讲解系统变量的定义、初始化过程、内部访问方式以及同步机制。帮助大家更好地理解和运用系统变量。

系统变量概述

一个系统变量可以同时具有 global 级别和 session 级别的数据,在代码中存在多个副本。这与配置项(参数)不同,一个配置项要么是集群级别,要么是租户级别。

系统变量的定义

要增加一个新的变量,首先在 src/share/system_variable/ob_system_variable_init.json 文件中按照下面的格式编写一段代码。

  1. "ob_query_timeout": {
  2. "id": 10005,
  3. "name": "ob_query_timeout",
  4. "default_value": "10000000",
  5. "base_value": "10000000",
  6. "data_type": "int",
  7. "info": "Query timeout in microsecond(us)",
  8. "flags": "GLOBAL | SESSION | NEED_SERIALIZE",
  9. "on_check_and_convert_func": "ObSysVarOnCheckFuncs::check_and_convert_timeout_too_large",
  10. "publish_version": "",
  11. "info_cn": "",
  12. "background_cn": "",
  13. "ref_url": ""
  14. },

其中每一项的含义如下:

2参数说明举例
对象名(括号外)数据结构的名字,与括号内的变量名一致ob_query_timeout
id变量的序号,按顺序手动分配0, 1, 95, 10001, 10037
name变量名,与括号外的对象名一致ob_query_timeout
default_value默认值0, 1000, disabled, binary
base_value基准值,初始与默认值相同,用于不同session间的变量同步0, 1000, disabled, binary
data_type数据类型int, varchar, enum, bool
info描述信息,描述变量的主要功能"Query timeout in microsecond(us)"
flags标签,包括级别、是否序列化、是否可见等GLOBAL | SESSION | NEED_SERIALIZE | ORACLE_ONLY | INVISIBLE
on_check_and_convert_func合法性检查和格式转换函数(可选)ObSysVarOnCheckFuncs::check_and_convert_timeout_too_large
min_val最小值(可选)1
max_val最大值(可选)10000
enum_names枚举类型的取值列表(可选)"enum_names": ["OFF", "AUTO", "FORCE"],
publish_version发布版本号"227", "420"(即2.2.7,4.2.0)
info_cn中文描述"查询超时时间(微秒)"
background_cn添加原因/背景"避免死锁、一直重试等问题导致的sql阻塞的情况"
ref_url变量申请链接"xxx"

然后执行 ./gen_ob_sys_variables.py 脚本,新增的变量会出现在 ob_system_variable_init.cpp 文件中。

  1. [&] (){
  2. ObSysVars[100].default_value_ = "10000000" ;
  3. ObSysVars[100].info_ = "Query timeout in microsecond(us)" ;
  4. ObSysVars[100].name_ = "ob_query_timeout" ;
  5. ObSysVars[100].data_type_ = ObIntType ;
  6. ObSysVars[100].flags_ = ObSysVarFlag::GLOBAL_SCOPE | ObSysVarFlag::SESSION_SCOPE | ObSysVarFlag::NEED_SERIALIZE ;
  7. ObSysVars[100].on_check_and_convert_func_ = "ObSysVarOnCheckFuncs::check_and_convert_timeout_too_large" ;
  8. ObSysVars[100].id_ = SYS_VAR_OB_QUERY_TIMEOUT ;
  9. cur_max_var_id = MAX(cur_max_var_id, static_cast<int64_t>(SYS_VAR_OB_QUERY_TIMEOUT)) ;
  10. ObSysVarsIdToArrayIdx[SYS_VAR_OB_QUERY_TIMEOUT] = 100 ;
  11. ObSysVars[100].base_value_ = "10000000" ;
  12. ObSysVars[100].alias_ = "OB_SV_QUERY_TIMEOUT" ;
  13. }();

这些变量的初始值都会存储在以下数组中。其中 ObSysVars 数组保存了从 json 文件生成的最原始的变量信息;ObSysVarDefaultValues 数组存储了系统变量的默认值;ObSysVarBaseValues 数组存储了系统变量的基准值。每新增一个系统变量,这些数组中就会多一个元素。

  1. namespace oceanbase
  2. {
  3. namespace share
  4. {
  5. // 变量的初始信息
  6. static ObSysVarFromJson ObSysVars[ObSysVarFactory::ALL_SYS_VARS_COUNT];
  7. // 变量的默认值
  8. static ObObj ObSysVarDefaultValues[ObSysVarFactory::ALL_SYS_VARS_COUNT];
  9. // 变量的基准值
  10. static ObObj ObSysVarBaseValues[ObSysVarFactory::ALL_SYS_VARS_COUNT];

系统变量缓存的定义

在某些场景为满足功能或性能的需求,需要额外为系统变量增加一份缓存(按需定义,不是必须的)。系统变量的缓存通过 DEF_SYS_VAR_CACHE_FUNCS 和 DEF_SYS_VAR_CACHE_FUNCS_STR 宏定义。

  1. class ObBasicSessionInfo
  2. {
  3. private:
  4. #define DEF_SYS_VAR_CACHE_FUNCS(SYS_VAR_TYPE, SYS_VAR_NAME) \
  5. void set_##SYS_VAR_NAME(SYS_VAR_TYPE value) \
  6. { \
  7. inc_data_.SYS_VAR_NAME##_ = (value); \
  8. inc_##SYS_VAR_NAME##_ = true; \
  9. } \
  10. void set_base_##SYS_VAR_NAME(SYS_VAR_TYPE value) \
  11. { \
  12. base_data_.SYS_VAR_NAME##_ = (value); \
  13. } \
  14. const SYS_VAR_TYPE &get_##SYS_VAR_NAME() const \
  15. { \
  16. return get_##SYS_VAR_NAME(inc_##SYS_VAR_NAME##_); \
  17. } \
  18. const SYS_VAR_TYPE &get_##SYS_VAR_NAME(bool is_inc) const \
  19. { \
  20. return is_inc ? inc_data_.SYS_VAR_NAME##_ : base_data_.SYS_VAR_NAME##_; \
  21. }
  22. class SysVarsCache
  23. {
  24. public:
  25. DEF_SYS_VAR_CACHE_FUNCS(int64_t, ob_max_read_stale_time);
  26. DEF_SYS_VAR_CACHE_FUNCS(bool, tx_read_only);
  27. DEF_SYS_VAR_CACHE_FUNCS_STR(nls_date_format);
  28. }
  29. private:
  30. static SysVarsCacheData base_data_;
  31. SysVarsCacheData inc_data_;
  32. union {
  33. uint64_t inc_flags_;
  34. struct {
  35. bool inc_auto_increment_increment_:1;
  36. bool inc_sql_throttle_current_priority_:1;
  37. ......

其中 base_data_ 表示基准数据,它是全局唯一的,与“global变量”对应;inc_data_ 表示最新数据,每个session各有一份,与“session变量”对应。

以 ob_max_read_stale_time 变量为例,上面的宏展开后生成以下代码。

  1. void set_ob_max_read_stale_time(int64_t value)
  2. {
  3. inc_data_.ob_max_read_stale_time_ = (value);
  4. inc_ob_max_read_stale_time_ = true;
  5. }
  6. void set_base_ob_max_read_stale_time(int64_t value)
  7. {
  8. base_data_.ob_max_read_stale_time_ = (value);
  9. }
  10. const int64_t &get_ob_max_read_stale_time() const
  11. {
  12. return get_ob_max_read_stale_time(inc_ob_max_read_stale_time_);
  13. }
  14. const int64_t &get_ob_max_read_stale_time(bool is_inc) const
  15. {
  16. return is_inc ? inc_data_.ob_max_read_stale_time_ : base_data_.ob_max_read_stale_time_;
  17. }

其中 set_ob_max_read_stale_time() 函数会修改 inc_ob_max_read_stale_time_ 为 true 并更新 inc_data_;然后调用 get_ob_max_read_stale_time() 函数会返回修改过的数据 inc_data_.ob_max_read_stale_time_,未修改则会返回基准数据 base_data_.ob_max_read_stale_time_。

变量初始化

定义上面的一系列数据结构后,接下来在 OBServer 启动时系统变量会进行初始化。

变量基础数据

ObSysVars 数组保存了最原始的变量信息,在静态对象的构造函数中进行初始化。

  1. namespace oceanbase
  2. {
  3. namespace share
  4. {
  5. // 变量的初始信息
  6. static ObSysVarFromJson ObSysVars[ObSysVarFactory::ALL_SYS_VARS_COUNT];
  7. // 变量的默认值
  8. static ObObj ObSysVarDefaultValues[ObSysVarFactory::ALL_SYS_VARS_COUNT];
  9. // 变量的基准值
  10. static ObObj ObSysVarBaseValues[ObSysVarFactory::ALL_SYS_VARS_COUNT];
  11. static struct VarsInit{
  12. VarsInit(){
  13. // 保存当前系统变量的最大的id
  14. int64_t cur_max_var_id = 0;
  15. // ObSysVarsIdToArrayIdx数组默认初始值为-1,-1表示无效索引
  16. memset(ObSysVarsIdToArrayIdx, -1, sizeof(ObSysVarsIdToArrayIdx));
  17. [&] (){
  18. ObSysVars[0].default_value_ = "1" ;
  19. ObSysVars[0].info_ = "" ;
  20. ObSysVars[0].name_ = "auto_increment_increment" ;
  21. ObSysVars[0].data_type_ = ObUInt64Type ;
  22. ObSysVars[0].min_val_ = "1" ;
  23. ObSysVars[0].max_val_ = "65535" ;
  24. ObSysVars[0].flags_ = ObSysVarFlag::GLOBAL_SCOPE | ObSysVarFlag::SESSION_SCOPE | ObSysVarFlag::NEED_SERIALIZE ;
  25. ObSysVars[0].id_ = SYS_VAR_AUTO_INCREMENT_INCREMENT ;
  26. cur_max_var_id = MAX(cur_max_var_id, static_cast<int64_t>(SYS_VAR_AUTO_INCREMENT_INCREMENT)) ;
  27. ObSysVarsIdToArrayIdx[SYS_VAR_AUTO_INCREMENT_INCREMENT] = 0 ;
  28. ObSysVars[0].base_value_ = "1" ;
  29. ObSysVars[0].alias_ = "OB_SV_AUTO_INCREMENT_INCREMENT" ;
  30. }();

ObSysVarDefaultValues 数组保存了系统变量的默认值,ObSysVarBaseValues 数组保存了系统变量的基准值。在 OBServer 启动时,会初始化这两个数组:

  • ObServer::init()
  • ObPreProcessSysVars::init_sys_var()
  • ObSysVariables::init_default_values()

该函数遍历 ObSysVars 数组,将每个变量的 default_value_ 存储到 ObSysVarDefaultValues 数组中,将每个变量的 base_value_ 存储到 ObSysVarBaseValues 数组中。

  1. int ObSysVariables::init_default_values()
  2. {
  3. int ret = OB_SUCCESS;
  4. int64_t sys_var_count = get_amount();
  5. for (int64_t i = 0; OB_SUCC(ret) && i < sys_var_count; ++i) {
  6. const ObString &sys_var_val_str = ObSysVariables::get_value(i);
  7. const ObString &base_sys_var_val_str = ObSysVariables::get_base_str_value(i);
  8. ......
  9. ObSysVarDefaultValues[i] = out_obj;
  10. ObSysVarBaseValues[i] = base_out_obj;

global 变量:sysvar_array_

ObSysVariableSchema 是管理系统变量的 schema 对象,其中的 sysvar_array_ 数组存储了 global 变量的值,因此系统变量也具有 schema 多版本的能力。

  1. class ObSysVariableSchema : public ObSchema
  2. {
  3. private:
  4. uint64_t tenant_id_;
  5. int64_t schema_version_;
  6. ObSysVarSchema *sysvar_array_[ObSysVarFactory::ALL_SYS_VARS_COUNT];
  7. bool read_only_;
  8. class ObSysVarSchema : public ObSchema
  9. {
  10. private:
  11. uint64_t tenant_id_;
  12. common::ObString name_;
  13. common::ObObjType data_type_;
  14. common::ObString value_;
  15. common::ObString min_val_;
  16. common::ObString max_val_;
  17. common::ObString info_;
  18. common::ObZone zone_;
  19. int64_t schema_version_;
  20. int64_t flags_;

sysvar_array_ 数组在 OBServer 启动时进行初始化,首先调用的是 init_schema() 函数。

  • ObServer::init_schema()
  • ObMultiVersionSchemaService::init()
  • ObMultiVersionSchemaService::init_sys_tenant_user_schema()

先调用 load_default_system_variable() 将所有变量的默认值都加载到 sysvar_array_ 数组中,再调用 put_schema() 把这个 schema 添加到 schema_cache_ 中。

  1. int ObMultiVersionSchemaService::init_sys_tenant_user_schema()
  2. {
  3. ObSysVariableSchema sys_variable;
  4. if (OB_FAIL(sys_variable.load_default_system_variable(true))) {
  5. LOG_WARN("load sys tenant default system variable failed", K(ret));
  6. } else if (OB_FAIL(schema_cache_.put_schema(SYS_VARIABLE_SCHEMA,
  7. OB_SYS_TENANT_ID,
  8. sys_variable.get_tenant_id(),
  9. sys_variable.get_schema_version(),
  10. sys_variable))) {
  • ObSysVariableSchema::load_default_system_variable()

该函数遍历 ObSysVars 数组获取变量的默认值,然后调用 add_sysvar_schema() 函数将变量值添加到 sysvar_array_ 数组中。

  1. int ObSysVariableSchema::load_default_system_variable(bool is_sys_tenant)
  2. {
  3. int ret = OB_SUCCESS;
  4. ObString value;
  5. ObSysVarSchema sysvar;
  6. for (int64_t i = 0; OB_SUCC(ret) && i < ObSysVariables::get_amount(); ++i) {
  7. sysvar.reset();
  8. // 从 ObSysVariables 中获取默认数据
  9. if (is_sys_tenant && ObCharset::case_insensitive_equal(ObSysVariables::get_name(i), OB_SV_LOWER_CASE_TABLE_NAMES)) {
  10. value = ObString::make_string("2");
  11. } else {
  12. value = ObSysVariables::get_value(i);
  13. }
  14. } else if (OB_FAIL(sysvar.set_value(value))) {
  15. LOG_WARN("set sysvar value failed", K(ret), K(value));
  16. } else if (OB_FAIL(add_sysvar_schema(sysvar))) {
  17. LOG_WARN("add sysvar schema failed", K(ret));
  18. }
  19. }

session 变量:sys_vars_

ObBasicSessionInfo 是管理 session 相关信息的对象,其中的 sys_vars_ 数组存储了 session 变量的值,在建立 session 连接时进行初始化。

  1. class ObBasicSessionInfo
  2. {
  3. private:
  4. share::ObBasicSysVar *sys_vars_[share::ObSysVarFactory::ALL_SYS_VARS_COUNT];
外部 session

外部 session 连接是指从客户端发起的连接,一般用于处理用户的 sql 请求。外部 session 的变量缓存值在初始化时,会从 schema 中取值,同时也依赖 schema 的同步机制。

  • int ObMPConnect::process()

在建立连接时,先调用 create_session() 函数创建 session,然后调用 verify_identify() 初始化变量。

  • ObMPConnect::verify_identify()
  • ObMPConnect::load_privilege_info()

通过租户ID从 schema_cache_ 中获取对应的 sys_variable_schema,然后将 sys_variable_schema 作为参数调用 load_all_sys_vars() 函数。

  1. int ObMPConnect::load_privilege_info(ObSQLSessionInfo &session)
  2. {
  3. const ObSysVariableSchema *sys_variable_schema = NULL;
  4. } else if (OB_FAIL(schema_guard.get_sys_variable_schema(conn->tenant_id_, sys_variable_schema))) {
  5. LOG_WARN("get sys variable schema failed", K(ret));
  6. } else if (OB_ISNULL(sys_variable_schema)) {
  7. ret = OB_ERR_UNEXPECTED;
  8. LOG_WARN("sys variable schema is null", K(ret));
  9. } else if (OB_FAIL(session.init_tenant(tenant_name_, conn->tenant_id_))) {
  10. LOG_WARN("failed to init_tenant", K(ret));
  11. } else if (OB_FAIL(session.load_all_sys_vars(*sys_variable_schema, false))) {
  12. LOG_WARN("load system variables failed", K(ret));
  • ObBasicSessionInfo::load_all_sys_vars()

该函数每次循环,先从 ObSysVars 数组中获取 sys_var_id,然后从传入的 sys_var_schema 中获取 sys_var 对象,然后把变量的 name 和 value 作为参数,调用 load_sys_variable() 函数。

  1. int ObBasicSessionInfo::load_all_sys_vars(const ObSysVariableSchema &sys_var_schema, bool sys_var_created)
  2. {
  3. int ret = OB_SUCCESS;
  4. OZ (clean_all_sys_vars());
  5. if (!sys_var_created) {
  6. OZ (sys_var_fac_.create_all_sys_vars());
  7. }
  8. OX (influence_plan_var_indexs_.reset());
  9. ObArenaAllocator calc_buf(ObModIds::OB_SQL_SESSION);
  10. for (int64_t i = 0; OB_SUCC(ret) && i < get_sys_var_count(); i++) {
  11. ObSysVarClassType sys_var_id = ObSysVariables::get_sys_var_id(i);
  12. const ObSysVarSchema *sys_var = NULL;
  13. OZ (sys_var_schema.get_sysvar_schema(sys_var_id, sys_var), sys_var_id, i);
  14. OV (OB_NOT_NULL(sys_var));
  15. OZ (load_sys_variable(calc_buf, sys_var->get_name(), sys_var->get_data_type(),
  16. sys_var->get_value(), sys_var->get_min_val(),
  17. sys_var->get_max_val(), sys_var->get_flags(), true));
  • ObBasicSessionInfo::load_sys_variable() 

先调用 find_sys_var_id_by_name() 函数通过 name 获取 var_id,然后调用 create_sys_var() 函数获取(创建)变量指针 sys_var,再调用 sys_var->init() 给变量赋值,最后调用 process_session_variable() 函数给变量缓存赋值。

  1. int ObBasicSessionInfo::load_sys_variable(ObIAllocator &calc_buf,
  2. const ObString &name,
  3. const ObObj &type,
  4. const ObObj &value,
  5. const ObObj &min_val,
  6. const ObObj &max_val,
  7. const int64_t flags,
  8. bool is_from_sys_table)
  9. {
  10. if (SYS_VAR_INVALID == (var_id = ObSysVarFactory::find_sys_var_id_by_name(name, is_from_sys_table))) {
  11. ......
  12. } else if (OB_FAIL(create_sys_var(var_id, store_idx, sys_var))) {
  13. LOG_WARN("fail to create sys var", K(name), K(value), K(ret));
  14. ......
  15. } else if (OB_FAIL(sys_var->init(real_val, min_ptr, max_ptr, val_type.get_type(), flags))) {
  16. LOG_WARN("fail to init sys var", K(ret), K(sys_var->get_type()),
  17. K(real_val), K(name), K(value));
  18. } else if (OB_FAIL(process_session_variable(var_id, real_val,
  19. false /*check_timezone_valid*/,
  20. false /*is_update_sys_var*/))) {
  • ObBasicSessionInfo::create_sys_var()

初次调用 create_sys_var() 函数时,sys_vars_[store_idx] 为空,然后调用 sys_var_fac_.create_sys_var() 函数生成 sys_var,再把 sys_var 赋给 sys_vars_[store_idx]。session 刚初始化时,sys_var 指向的是变量的默认值。

再次调用 create_sys_var() 函数时,sys_vars_[store_idx] 不为空,将其值赋给 sys_var,相当于获取了当前session 的变量。

  1. int ObBasicSessionInfo::create_sys_var(ObSysVarClassType sys_var_id,
  2. int64_t store_idx, ObBasicSysVar *&sys_var)
  3. {
  4. int ret = OB_SUCCESS;
  5. OV (0 <= store_idx && store_idx < ObSysVarFactory::ALL_SYS_VARS_COUNT,
  6. OB_ERR_UNEXPECTED, sys_var_id, store_idx);
  7. if (OB_NOT_NULL(sys_vars_[store_idx])) {
  8. OV (sys_vars_[store_idx]->get_type() == sys_var_id,
  9. OB_ERR_UNEXPECTED, sys_var_id, store_idx, sys_vars_[store_idx]->get_type());
  10. OX (sys_var = sys_vars_[store_idx]);
  11. } else {
  12. OZ (sys_var_fac_.create_sys_var(sys_var_id, sys_var), sys_var_id);
  13. OV (OB_NOT_NULL(sys_var), OB_ERR_UNEXPECTED, sys_var_id, store_idx);
  14. OX (sys_vars_[store_idx] = sys_var);
  15. }
  16. return ret;
  17. }

内部 session

内部 session 连接是指 OBServer 内部或 OBProxy 向 OBServer 发起的内部请求建立的连接,不是用户主动建立的连接。内部 session 的变量总是获取全局默认值作为初值,而不是从最新的 schema 中取值。

  • ObCommonSqlProxy::acquire()
  • ObInnerSQLConnectionPool::acquire()
  • ObInnerSQLConnection::init()
  • ObInnerSQLConnection::init_session()
  • ObInnerSQLConnection::init_session_info()
  • ObBasicSessionInfo::load_default_sys_variable()
  1. int ObBasicSessionInfo::load_default_sys_variable(const bool print_info_log, const bool is_sys_tenant, bool is_deserialized)
  2. {
  3. int ret = OB_SUCCESS;
  4. if (OB_FAIL(sys_var_fac_.create_all_sys_vars())) {
  5. LOG_WARN("fail create all sys variables", K(ret));
  6. } else if (OB_FAIL(init_system_variables(print_info_log, is_sys_tenant, is_deserialized))) {
  7. LOG_WARN("Init system variables failed !", K(ret));
  8. }
  9. return ret;
  10. }
  • ObBasicSessionInfo::init_system_variables()

调用 ObSysVariables::get_value(i) 获取 value,也就是从全局的 ObSysVars 数组中获取默认值,然后调用 load_sys_variable() 函数将 value 的值加载到 session 中。

  1. int ObBasicSessionInfo::init_system_variables(const bool print_info_log, const bool is_sys_tenant,
  2. bool is_deserialized)
  3. {
  4. for (int64_t i = 0; OB_SUCC(ret) && i < var_amount; ++i) {
  5. name.assign_ptr(const_cast<char*>(ObSysVariables::get_name(i).ptr()),
  6. static_cast<ObString::obstr_size_t>(strlen(ObSysVariables::get_name(i).ptr())));
  7. bool is_exist = false;
  8. if (OB_FAIL(sys_variable_exists(name, is_exist))) {
  9. LOG_WARN("failed to check if sys variable exists", K(name), K(ret));
  10. } else if (!is_exist) {
  11. // Note: 如果已经初始化过 base value,则下面的流程不会执行
  12. var_type = ObSysVariables::get_type(i);
  13. var_flag = ObSysVariables::get_flags(i);
  14. value.set_varchar(is_deserialized ? ObSysVariables::get_base_str_value(i) :ObSysVariables::get_value(i));
  15. value.set_collation_type(ObCharset::get_system_collation());
  16. min_val.set_varchar(ObSysVariables::get_min(i));
  17. min_val.set_collation_type(ObCharset::get_system_collation());
  18. max_val.set_varchar(ObSysVariables::get_max(i));
  19. max_val.set_collation_type(ObCharset::get_system_collation());
  20. type.set_type(var_type);
  21. if(is_sys_tenant) {
  22. if (OB_FAIL(process_variable_for_tenant(name, value))) {
  23. LOG_WARN("process system variable for tenant error", K(name), K(value), K(ret));
  24. }
  25. }
  26. if (OB_SUCC(ret)) {
  27. if (OB_FAIL(load_sys_variable(calc_buf, name, type, value, min_val, max_val, var_flag, false))) {
  • ObBasicSessionInfo::load_sys_variable()

初次调用 create_sys_var() 函数时,创建 session 变量 sys_var,然后用传入的默认值 value 对其进行初始化。

  1. int ObBasicSessionInfo::load_sys_variable(ObIAllocator &calc_buf,
  2. const ObString &name,
  3. const ObObj &type,
  4. const ObObj &value,
  5. const ObObj &min_val,
  6. const ObObj &max_val,
  7. const int64_t flags,
  8. bool is_from_sys_table)
  9. {
  10. } else if (OB_FAIL(create_sys_var(var_id, store_idx, sys_var))) {
  11. LOG_WARN("fail to create sys var", K(name), K(value), K(ret));
  12. } else if (OB_FAIL(ObBasicSessionInfo::change_value_for_special_sys_var(
  13. var_id, val_ptr, real_val))) {
  14. LOG_WARN("fail to change value for special sys var", K(ret), K(var_id), K(val_ptr));
  15. } else if (OB_FAIL(sys_var->init(real_val, min_ptr, max_ptr, val_type.get_type(), flags))) {
  16. LOG_WARN("fail to init sys var", K(ret), K(sys_var->get_type()),
  17. K(real_val), K(name), K(value));
  18. } else if (OB_FAIL(process_session_variable(var_id, real_val,
  19. false /*check_timezone_valid*/,
  20. false /*is_update_sys_var*/))) {
  21. LOG_WARN("process system variable error", K(name), K(type), K(real_val), K(value), K(ret));

变量缓存基准值:base_data_

base_data_ 表示系统变量缓存的基准数据,它是 OBServer 全局唯一的。

  • ObServer::init()
  • ObBasicSessionInfo::init_sys_vars_cache_base_values()

遍历 ObSysVarDefaultValues 数组,将变量的默认值写到 ObBasicSessionInfo::base_data_ 中,这是一个 static SysVarsCacheData[] 类型的数组。

  1. int ObBasicSessionInfo::init_sys_vars_cache_base_values()
  2. {
  3. int ret = OB_SUCCESS;
  4. int64_t store_idx = OB_INVALID_INDEX_INT64;
  5. ObBasicSessionInfo::SysVarsCache sys_vars_cache;
  6. int64_t var_amount = ObSysVariables::get_amount();
  7. for (int64_t i = 0; OB_SUCC(ret) && i < var_amount; ++i) {
  8. store_idx = ObSysVarsToIdxMap::get_store_idx((int64_t)ObSysVariables::get_sys_var_id(i));
  9. OX (fill_sys_vars_cache_base_value(
  10. ObSysVariables::get_sys_var_id(i),
  11. sys_vars_cache,
  12. ObSysVariables::get_default_value(store_idx) ));
  13. }
  14. return ret;
  15. }

变量缓存增量值:inc_data_

inc_data_ 表示 session 变量的缓存值,每个 session 各有一份缓存数据。

外部session 的 inc_data_ 变量缓存值初始化为 schema 中的最新值,内部 session 的 inc_data_ 初始化为变量的默认值,也就是说变量缓存与 session 变量的初值是一致的,它们初始化的调用路径也基本相同。

变量查询

外部查询

查询 global 变量

global 变量的值是从 schema 中获取的,在执行如“show global variables like xxx”的命令时就会触发以下流程:

  • ObBasicSessionInfo::get_global_sys_variable()

先调用 get_sys_variable_schema() 函数获取 sys_variable_schema 对象,然后从该对象中获取 sysvar_schema。

  1. int ObBasicSessionInfo::get_global_sys_variable(const uint64_t actual_tenant_id, // 为了处理租户已经切掉的情况
  2. ObIAllocator &calc_buf,
  3. const ObDataTypeCastParams &dtc_params,
  4. const ObString &var_name,
  5. ObObj &val)
  6. {
  7. } else if (OB_FAIL(schema_guard.get_sys_variable_schema(actual_tenant_id, sys_variable_schema))) {
  8. LOG_WARN("get sys variable schema failed", K(ret));
  9. } else if (OB_ISNULL(sys_variable_schema)) {
  10. ret = OB_ERR_UNEXPECTED;
  11. LOG_WARN("sys variable schema is null", K(ret));
  12. } else if (OB_FAIL(sys_variable_schema->get_sysvar_schema(var_name, sysvar_schema))) {
  • ObSysVariableSchema::get_sysvar_schema(const ObString &name, const ObSysVarSchema *&sysvar_schema)
  • ObSysVariableSchema::get_sysvar_schema(ObSysVarClassType var_id, const ObSysVarSchema *&sysvar_schema)
  • ObSysVariableSchema::get_sysvar_schema(int64_t idx)

该函数根据传入的 idx 获取对应的变量,最终返回 sysvar_array_[idx]。

  1. const ObSysVarSchema *ObSysVariableSchema::get_sysvar_schema(int64_t idx) const
  2. {
  3. const ObSysVarSchema *ret = NULL;
  4. if (idx >= 0 && idx < get_sysvar_count()) {
  5. ret = sysvar_array_[idx];
  6. }
  7. return ret;
  8. }

查询 session 变量

session 变量的值是从 session 本地数组 sys_vars_ 中获取的,执行如“show variables like xxx”的命令时会触发以下调用流程:

  • ObResultSet::inner_get_next_row()
  • ......
  • ObBasicSessionInfo::get_sys_variable_by_name()
  • ObBasicSessionInfo::inner_get_sys_var(oceanbase::common::ObString const&, oceanbase::share::ObBasicSysVar*&)
  • ObBasicSessionInfo::inner_get_sys_var(oceanbase::common::ObString const&, long&, oceanbase::share::ObBasicSysVar*&)

该函数先调用 find_sys_var_id_by_name() 函数通过 sys_var_name 获取 sys_var_id,然后调用 calc_sys_var_store_idx() 函数利用 sys_var_id 计算出 store_idx,也就是变量在数组中的下标,最后返回 sys_vars_[store_idx] 即可。

  1. int ObBasicSessionInfo::inner_get_sys_var(const ObString &sys_var_name,
  2. int64_t &store_idx,
  3. ObBasicSysVar *&sys_var) const
  4. {
  5. int ret = OB_SUCCESS;
  6. ObSysVarClassType sys_var_id = SYS_VAR_INVALID;
  7. if (OB_UNLIKELY(SYS_VAR_INVALID == (
  8. sys_var_id = ObSysVarFactory::find_sys_var_id_by_name(sys_var_name)))) {
  9. ret = OB_ERR_SYS_VARIABLE_UNKNOWN;
  10. LOG_WARN("fail to find sys var id by name", K(ret), K(sys_var_name), K(lbt()));
  11. } else if (OB_FAIL(ObSysVarFactory::calc_sys_var_store_idx(sys_var_id, store_idx))) {
  12. LOG_WARN("fail to calc sys var store idx", K(ret), K(sys_var_id), K(sys_var_name), K(lbt()));
  13. } else if (OB_UNLIKELY(store_idx < 0) ||
  14. OB_UNLIKELY(store_idx >= ObSysVarFactory::ALL_SYS_VARS_COUNT)) {
  15. ret = OB_ERR_UNEXPECTED;
  16. LOG_ERROR("got store_idx is invalid", K(ret), K(store_idx));
  17. } else if (OB_ISNULL(sys_vars_[store_idx])) {
  18. ret = OB_ENTRY_NOT_EXIST;
  19. LOG_WARN("sys var is NULL", K(ret), K(store_idx), K(sys_var_name));
  20. } else {
  21. sys_var = sys_vars_[store_idx];
  22. }
  23. return ret;
  24. }

内部获取

获取 global 变量

内部访问 global 变量同样是通过 schema 获取,先从全局的 GCTX.schema_service_中获取 schema_guard,然后通过变量名获取 var_schema,再逐步解析得到最终的基础数据类型。

例如获取 weak_read_version_refresh_interval:

  1. int ObRootService::check_weak_read_version_refresh_interval(int64_t refresh_interval, bool &valid)
  2. {
  3. ......
  4. } else if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
  5. LOG_WARN("get schema guard failed", KR(ret), K(tenant_id));
  6. } else if (OB_FAIL(schema_guard.get_tenant_system_variable(tenant_id,
  7. OB_SV_MAX_READ_STALE_TIME, var_schema))) {
  8. LOG_WARN("get tenant system variable failed", KR(ret), K(tenant_id));
  9. } else if (OB_ISNULL(var_schema)) {
  10. ret = OB_ERR_UNEXPECTED;
  11. LOG_WARN("var schema is null", KR(ret), K(tenant_id));
  12. } else if (OB_FAIL(var_schema->get_value(NULL, NULL, obj))) {
  13. LOG_WARN("get value failed", KR(ret), K(tenant_id), K(obj));
  14. } else if (OB_FAIL(obj.get_int(session_max_stale_time))) {
  15. LOG_WARN("get int failed", KR(ret), K(tenant_id), K(obj));
  16. } else if (session_max_stale_time != share::ObSysVarFactory::INVALID_MAX_READ_STALE_TIME
  17. && refresh_interval > session_max_stale_time) {

通过这种方式获取变量,通常会定义一个变量别名,方便代码中进行访问。

  1. namespace oceanbase
  2. {
  3. namespace share
  4. {
  5. static const char* const OB_SV_ENABLE_RICH_ERROR_MSG = "ob_enable_rich_error_msg";
  6. static const char* const OB_SV_LOG_ROW_VALUE_OPTIONS = "log_row_value_options";
  7. static const char* const OB_SV_MAX_READ_STALE_TIME = "ob_max_read_stale_time";
  8. ......

获取 session 变量最新值

内部查询调用 get_sys_var() 函数通过下标 idx 获取 session 变量的值。

  1. ObBasicSysVar *ObBasicSessionInfo::get_sys_var(const int64_t idx)
  2. {
  3. ObBasicSysVar *var = NULL;
  4. if (idx >= 0 && idx < ObSysVarFactory::ALL_SYS_VARS_COUNT) {
  5. var = sys_vars_[idx];
  6. }
  7. return var;
  8. }

获取 session 变量缓存值

获取变量缓存值,就是从 ObBasicSessionInfo::sys_vars_cache_数组中取值,通常是封装了一些函数,然后再调用 sys_vars_cache_ 的 get_xxx() 接口,这些接口就是用 DEF_SYS_VAR_CACHE_FUNCS 宏定义的。

举几个例子:

  1. class ObBasicSessionInfo
  2. {
  3. int64_t get_trx_lock_timeout() const
  4. {
  5. return sys_vars_cache_.get_ob_trx_lock_timeout();
  6. }
  7. int64_t get_ob_max_read_stale_time() {
  8. return sys_vars_cache_.get_ob_max_read_stale_time();
  9. }
  10. int get_sql_throttle_current_priority(int64_t &sql_throttle_current_priority)
  11. {
  12. sql_throttle_current_priority = sys_vars_cache_.get_sql_throttle_current_priority();
  13. return common::OB_SUCCESS;
  14. }

只要能够访问 session,就可以访问 session 中的变量,比如获取 ob_max_read_stale_time:

session->get_ob_max_read_stale_time();

变量更新

外部修改

global 变量修改流程

更新 global 变量就是更新其对应的 schema,执行如“set global xxx = xx”的命令时触发以下调用流程:

  • ObMPQuery::do_process()
  • ObVariableSetExecutor::execute(ObExecContext &ctx, ObVariableSetStmt &stmt)
  • ObVariableSetExecutor::update_global_variables()

该函数先对更新数据做合法性检查,然后构造一个新的 sysvar_schema 加入到 rpc 参数中,最后发送更新系统变量的请求到其他节点。

  1. int ObVariableSetExecutor::update_global_variables(ObExecContext &ctx,
  2. ObDDLStmt &stmt,
  3. const ObSetVar &set_var,
  4. const ObObj &val)
  5. {
  6. // 合法性检查
  7. } else if (set_var.var_name_ == OB_SV_MAX_READ_STALE_TIME) {
  8. int64_t max_read_stale_time = 0;
  9. if (OB_FAIL(val.get_int(max_read_stale_time))) {
  10. LOG_WARN("fail to get int value", K(ret), K(val));
  11. } else if (max_read_stale_time != ObSysVarFactory::INVALID_MAX_READ_STALE_TIME &&
  12. max_read_stale_time < GCONF.weak_read_version_refresh_interval) {
  13. ret = OB_INVALID_ARGUMENT;
  14. LOG_USER_ERROR(OB_INVALID_ARGUMENT,
  15. "max_read_stale_time is smaller than weak_read_version_refresh_interval");
  16. }
  17. // 构造新的 sysvar_schema,添加到参数 arg 的 sys_var_list_ 列表中
  18. if (OB_SUCC(ret)) {
  19. ......
  20. } else if (OB_FAIL(sysvar_schema.set_name(set_var.var_name_))) {
  21. LOG_WARN("set sysvar schema name failed", K(ret));
  22. } else if (OB_FAIL(sysvar_schema.set_value(val_str))) {
  23. LOG_WARN("set sysvar schema value failed", K(ret));
  24. } else {
  25. sysvar_schema.set_tenant_id(arg.tenant_id_);
  26. if (OB_FAIL(arg.sys_var_list_.push_back(sysvar_schema))) {
  27. LOG_WARN("store sys var to array failed", K(ret));
  28. }
  29. }
  30. }
  31. // 想其他节点发送更新变量的rpc请求
  32. if (OB_SUCC(ret)) {
  33. if (OB_ISNULL(task_exec_ctx = GET_TASK_EXECUTOR_CTX(ctx)) ||
  34. OB_ISNULL(common_rpc_proxy = task_exec_ctx->get_common_rpc())) {
  35. ret = OB_NOT_INIT;
  36. LOG_WARN("task exec ctx or common rpc proxy is NULL", K(ret), K(task_exec_ctx), K(common_rpc_proxy));
  37. } else if (OB_FAIL(common_rpc_proxy->modify_system_variable(arg))) {
  38. LOG_WARN("rpc proxy alter system variable failed", K(ret));
  39. } else {}
  40. }
  41. return ret;
  42. }

session 变量修改流程

更新session变量值的同时,还会更新对应缓存数据的值,执行如“set xxx = xx”的命令时会触发以下调用流程:

  • ObBasicSysVar::session_update()
  • ObBasicSessionInfo::update_sys_variable_by_name()

通过变量名 var 获取 var_id,然后调用 update_sys_variable() 传入 val。

  1. int ObBasicSessionInfo::update_sys_variable_by_name(const ObString &var, const ObObj &val)
  2. {
  3. int ret = OB_SUCCESS;
  4. ObSysVarClassType var_id = SYS_VAR_INVALID;
  5. if (var.empty()) {
  6. ret = OB_INVALID_ARGUMENT;
  7. LOG_WARN("invalid variable name", K(var), K(val), K(ret));
  8. } else if (SYS_VAR_INVALID == (var_id = ObSysVarFactory::find_sys_var_id_by_name(var))) {
  9. ret = OB_ERR_SYS_VARIABLE_UNKNOWN;
  10. LOG_WARN("unknown variable", K(var), K(val), K(ret));
  11. } else if (OB_FAIL(update_sys_variable(var_id, val))) {
  12. LOG_WARN("failed to update sys variable", K(var), K(val), K(ret));
  13. } else {}
  14. return ret;
  15. }
  • ObBasicSessionInfo::update_sys_variable()

先记录变量旧值,然后更新 session 变量缓存值,再更新 sys_var 的值,也就是 sys_vars_[store_idx] 的值。

  1. int ObBasicSessionInfo::update_sys_variable(const ObSysVarClassType sys_var_id, const ObObj &val)
  2. {
  3. } else if (is_track_session_info()) {
  4. // 记录修改变量的旧值
  5. if (OB_FAIL(track_sys_var(sys_var_id, sys_var->get_value()))) {
  6. LOG_WARN("failed to track sys var", K(ret), K(sys_var_id), K(val));
  7. if (OB_SUCC(ret)) {
  8. if (OB_FAIL(process_session_variable(sys_var_id, val, false /*check_timezone_valid*/,
  9. true /*is_update_sys_var*/))) {
  10. LOG_WARN("process system variable error", K(sys_var_id), K(val), K(ret));
  11. } else {
  12. sys_var->set_value(val);
  • ObBasicSessionInfo::process_session_variable()

基于传入的变量类型 var,更新对应 session 变量的缓存值,即 inc_data_.xxx 。

  1. OB_INLINE int ObBasicSessionInfo::process_session_variable(ObSysVarClassType var, const ObObj &val,
  2. const bool check_timezone_valid/*true*/, const bool is_update_sys_var/*false*/)
  3. {
  4. int ret = OB_SUCCESS;
  5. switch (var) {
  6. case SYS_VAR_OB_MAX_READ_STALE_TIME: {
  7. int64_t max_read_stale_time = 0;
  8. if (OB_FAIL(val.get_int(max_read_stale_time))) {
  9. LOG_WARN("fail to get int value", K(ret), K(val));
  10. } else if (max_read_stale_time != ObSysVarFactory::INVALID_MAX_READ_STALE_TIME &&
  11. max_read_stale_time < GCONF.weak_read_version_refresh_interval) {
  12. ret = OB_INVALID_ARGUMENT;
  13. LOG_USER_ERROR(OB_INVALID_ARGUMENT,
  14. "max_read_stale_time is smaller than weak_read_version_refresh_interval");
  15. } else {
  16. sys_vars_cache_.set_ob_max_read_stale_time(max_read_stale_time);
  17. }
  18. break;
  19. }

内部更新

内部更新主要是指系统变量在各节点之间的同步机制。当一个节点执行 global 变量更新操作之后,其他节点刷新 schema 的流程:

  • ObServerSchemaUpdater::process_refresh_task(const ObServerSchemaTask &task)
  • ObMultiVersionSchemaService::refresh_and_add_schema()
  • ObMultiVersionSchemaService::refresh_tenant_schema(const uint64_t tenant_id)
  • ObServerSchemaService::refresh_schema(const ObRefreshSchemaStatus &schema_status)
  1. int ObServerSchemaService::refresh_schema(
  2. const ObRefreshSchemaStatus &schema_status)
  3. {
  4. } else if (is_full_schema) {
  5. if (OB_FAIL(refresh_full_schema(schema_status))) {
  6. LOG_WARN("tenant refresh full schema failed", K(ret), K(schema_status));
  7. }
  8. } else {
  9. if (OB_FAIL(refresh_increment_schema(schema_status))) {
  10. LOG_WARN("tenant refresh increment schema failed", K(ret), K(schema_status));
  11. }
  • ObServerSchemaService::refresh_increment_schema()

调用 get_increment_schema_operations() 获取增量 schema 操作 schema_operations,然后调用 replay_log() 回放日志,再调用 update_schema_mgr() 更新 schema。

  1. int ObServerSchemaService::refresh_increment_schema(
  2. const ObRefreshSchemaStatus &schema_status)
  3. {
  4. if (OB_SUCC(ret) && !core_schema_change && !sys_schema_change) {
  5. const int64_t fetch_version = std::max(core_schema_version, schema_version);
  6. if (OB_FAIL(schema_mgr_for_cache_map_.get_refactored(tenant_id, schema_mgr_for_cache))) {
  7. LOG_WARN("fail to get schema_mgr_for_cache", KR(ret), K(schema_status));
  8. } else if (OB_ISNULL(schema_mgr_for_cache)) {
  9. ret = OB_ERR_UNEXPECTED;
  10. LOG_WARN("schema mgr for cache is null", KR(ret));
  11. } else if (OB_FAIL(schema_service_->get_increment_schema_operations(schema_status,
  12. local_schema_version, fetch_version, sql_client, schema_operations))) {
  13. LOG_WARN("get_increment_schema_operations failed", KR(ret), K(schema_status),
  14. K(local_schema_version), K(fetch_version));
  15. } else if (schema_operations.count() > 0) {
  16. // new cache
  17. SMART_VAR(AllSchemaKeys, all_keys) {
  18. if (OB_FAIL(replay_log(*schema_mgr_for_cache, schema_operations, all_keys))) {
  19. LOG_WARN("replay_log failed", KR(ret), K(schema_status), K(schema_operations));
  20. } else if (OB_FAIL(update_schema_mgr(sql_client, schema_status,
  21. *schema_mgr_for_cache, fetch_version, all_keys))){
  22. LOG_WARN("update schema mgr failed", KR(ret), K(schema_status));
  23. }
  24. }
  25. } else {}
  26. if (OB_SUCC(ret)) {
  27. schema_mgr_for_cache->set_schema_version(fetch_version);
  28. if (OB_FAIL(publish_schema(tenant_id))) {
  29. LOG_WARN("publish_schema failed", KR(ret), K(schema_status));
  30. } else {
  31. LOG_INFO("change schema version", K(schema_status), K(schema_version), K(core_schema_version));
  32. break;
  33. }
  34. }

小结

相比前面介绍过的配置项,系统变量的内部机制更加复杂,一个系统变量不仅同时具有 global 级别和 session 级别的副本,还可能在 session 上定义了一份缓存值。在新增系统变量时,要根据实际需求去定义变量的标签和缓存,在使用过程中也要注意取到的值是不是符合预期。

目前对配置项和系统变量的源码解析基本完成,前文中也偶尔提到过“合法性检查”这一概念,在下一篇文章中将会介绍它们是如何进行合法性检查的,以及相关的函数和规则是怎样定义的,欢迎大家持续关注。

参考文档

  1. 系统变量总览
  2. 什么是系统变量?如何使用系统变量?
  3. OceanBase源码
声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号