赞
踩
本文主要描述SQL中UPDATE语句的源码分析,代码为PG13.3版本。
以 update dtea set id = 1;
这条最简单的Update语句进行源码分析(dtea不是分区表,不考虑并行等,没有建立任何索引),帮助我们理解update的大致流程。
SQL流程如下:
对应的代码:
exec_simple_query(const char *query_string)
// ------- 解析器部分--------------
--> pg_parse_query(query_string); //生成语法解析树
--> pg_analyze_and_rewrite(parsetree, query_string,NULL, 0, NULL); // 生成查询树Query
--> parse_analyze(parsetree, query_string, paramTypes, numParams,queryEnv); // 语义分析
--> pg_rewrite_query(query); // 规则重写
// --------优化器----------
--> pg_plan_queries()
//-------- 执行器----------
--> PortalStart(portal, NULL, 0, InvalidSnapshot);
--> PortalRun(portal,FETCH_ALL,true,true,receiver,receiver,&qc); // 执行器执行
--> PortalDrop(portal, false);
关键数据结构:UpdateStmt
、RangeVar
、ResTarget
:
/* Update Statement */ typedef struct UpdateStmt { NodeTag type; RangeVar *relation; /* relation to update */ List *targetList; /* the target list (of ResTarget) */ // 对应语句中的set id = 0;信息在这里 Node *whereClause; /* qualifications */ List *fromClause; /* optional from clause for more tables */ List *returningList; /* list of expressions to return */ WithClause *withClause; /* WITH clause */ } UpdateStmt; // dtea 表 typedef struct RangeVar { NodeTag type; char *catalogname; /* the catalog (database) name, or NULL */ char *schemaname; /* the schema name, or NULL */ char *relname; /* the relation/sequence name */ bool inh; /* expand rel by inheritance? recursively act * on children? */ char relpersistence; /* see RELPERSISTENCE_* in pg_class.h */ Alias *alias; /* table alias & optional column aliases */ int location; /* token location, or -1 if unknown */ } RangeVar; // set id = 0; 经transformTargetList() -> transformTargetEntry,会转为TargetEntry typedef struct ResTarget { NodeTag type; char *name; /* column name or NULL */ // id column List *indirection; /* subscripts, field names, and '*', or NIL */ Node *val; /* the value expression to compute or assign */ // = 1表达式节点存在这里 int location; /* token location, or -1 if unknown */ } ResTarget;
用户输入的update语句 update dtea set id = 1
由字符串会转为可由数据库理解的内部数据结构语法解析树 UpdateStmt
。执行逻辑在 pg_parse_query(query_string);
中,需要理解flex与bison。
gram.y中Update语法的定义:
/***************************************************************************** * QUERY: * UpdateStmt (UPDATE) *****************************************************************************/ //结合这条语句分析 update dtea set id = 0; UpdateStmt: opt_with_clause UPDATE relation_expr_opt_alias SET set_clause_list from_clause where_or_current_clause returning_clause { UpdateStmt *n = makeNode(UpdateStmt); n->relation = $3; n->targetList = $5; n->fromClause = $6; n->whereClause = $7; n->returningList = $8; n->withClause = $1; $$ = (Node *)n; } ; set_clause_list: set_clause { $$ = $1; } | set_clause_list ',' set_clause { $$ = list_concat($1,$3); } ; // 对应的是 set id = 0 set_clause: // id = 0 set_target '=' a_expr { $1->val = (Node *) $3; $$ = list_make1($1); } | '(' set_target_list ')' '=' a_expr { int ncolumns = list_length($2); int i = 1; ListCell *col_cell; foreach(col_cell, $2) /* Create a MultiAssignRef source for each target */ { ResTarget *res_col = (ResTarget *) lfirst(col_cell); MultiAssignRef *r = makeNode(MultiAssignRef); r->source = (Node *) $5; r->colno = i; r->ncolumns = ncolumns; res_col->val = (Node *) r; i++; } $$ = $2; } ; set_target: ColId opt_indirection { $$ = makeNode(ResTarget); $$->name = $1; $$->indirection = check_indirection($2, yyscanner); $$->val = NULL; /* upper production sets this */ $$->location = @1; } ; set_target_list: set_target { $$ = list_make1($1); } | set_target_list ',' set_target { $$ = lappend($1,$3); } ;
生成了 UpdateStmt
后, 会经由 parse_analyze
语义分析,生成查询树 Query
,以供后续优化器生成执行计划。主要代码在 src/backent/parser/analyze.c
中
analyze.c : transform the raw parse tree into a query tree
parse_analyze()
--> transformTopLevelStmt(pstate, parseTree);
--> transformOptionalSelectInto(pstate, parseTree->stmt);
--> transformStmt(pstate, parseTree);
// transforms an update statement
--> transformUpdateStmt(pstate, (UpdateStmt *) parseTree); // 实际由UpdateStmt转为Query的处理函数
具体的我们看一下 transformUpdateStmt
函数实现:
/* transformUpdateStmt - transforms an update statement */ static Query *transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt) { Query *qry = makeNode(Query); ParseNamespaceItem *nsitem; Node *qual; qry->commandType = CMD_UPDATE; pstate->p_is_insert = false; /* process the WITH clause independently of all else */ if (stmt->withClause) { qry->hasRecursive = stmt->withClause->recursive; qry->cteList = transformWithClause(pstate, stmt->withClause); qry->hasModifyingCTE = pstate->p_hasModifyingCTE; } qry->resultRelation = setTargetTable(pstate, stmt->relation, stmt->relation->inh, true, ACL_UPDATE); nsitem = pstate->p_target_nsitem; /* subqueries in FROM cannot access the result relation */ nsitem->p_lateral_only = true; nsitem->p_lateral_ok = false; /* the FROM clause is non-standard SQL syntax. We used to be able to do this with REPLACE in POSTQUEL so we keep the feature.*/ transformFromClause(pstate, stmt->fromClause); /* remaining clauses can reference the result relation normally */ nsitem->p_lateral_only = false; nsitem->p_lateral_ok = true; qual = transformWhereClause(pstate, stmt->whereClause,EXPR_KIND_WHERE, "WHERE"); qry->returningList = transformReturningList(pstate, stmt->returningList); /* Now we are done with SELECT-like processing, and can get on with * transforming the target list to match the UPDATE target columns.*/ qry->targetList = transformUpdateTargetList(pstate, stmt->targetList); // 处理SQL语句中的 set id =1 qry->rtable = pstate->p_rtable; qry->jointree = makeFromExpr(pstate->p_joinlist, qual); qry->hasTargetSRFs = pstate->p_hasTargetSRFs; qry->hasSubLinks = pstate->p_hasSubLinks; assign_query_collations(pstate, qry); return qry; }
这里面要重点关注一下 transformTargetList
,会将抽象语法树中的 ResTarget
转为查询器的 TargetEntry
。
typedef struct TargetEntry
{
Expr xpr;
Expr *expr; /* expression to evaluate */
AttrNumber resno; /* attribute number (see notes above) */
char *resname; /* name of the column (could be NULL) */
Index ressortgroupref; /* nonzero if referenced by a sort/group clause */
Oid resorigtbl; /* OID of column's source table */
AttrNumber resorigcol; /* column's number in source table */
bool resjunk; /* set to true to eliminate the attribute from final target list */
} TargetEntry;
对于其内部处理可参考源码
src/backend/parser
中的相关处理,这里不再细述。需要重点阅读一下README,PG源码中所有的README都是非常好的资料,一定要认真读。
这块的内容很多,主要的逻辑是先进行逻辑优化,比如子查询、子链接、常量表达式、选择下推等等的处理,因为我们要分析的这条语句十分简单,所以逻辑优化的这部分都没有涉及到。物理优化,涉及到选择率,代价估计,索引扫描还是顺序扫描,选择那种连接方式,应用动态规划呢还是基因算法,选择nestloop-join、merge-join还是hash-join等。因为我们这个表没有建索引,更新单表也不涉及到多表连接,所以物理优化这块涉及的也不多。路径生成,生成最佳路径,再由最佳路径生成执行计划。
在路径生成这块,最基础的是对表的扫描方式,比如顺序扫描、索引扫描,再往上是连接方式,采用那种连接方式,再往上是比如排序、Limit等路径…,由底向上生成路径。我们要分析的语句很简单,没有其他处理,就顺序扫描再更新就可以了。
这里先不考虑并行执行计划。我们先看一下其执行计划结果:
postgres@postgres=# explain update dtea set id = 0;
QUERY PLAN
--------------------------------------------------------------
Update on dtea (cost=0.00..19.00 rows=900 width=68)
-> Seq Scan on dtea (cost=0.00..19.00 rows=900 width=68)
(2 rows)
下面我们分析一下其执行计划的生成流程:
// 由查询树Query--> Path --> Plan (PlannedStmt) pg_plan_queries() --> pg_plan_query() --> planner() --> standard_planner(Query *parse, const char *query_string, int cursorOptions,ParamListInfo boundParams) // 由Query---> PlannerInfo --> subquery_planner(glob, parse, NULL,false, tuple_fraction); // 涉及到很多逻辑优化的内容,很多不列出 --> pull_up_sublinks(root); --> pull_up_subqueries(root); // 这里只列出几个重要的逻辑优化内容,其他的不再列出...... // 如果是update/delete分区表继承表则走inheritance_planner(),其他情况走grouping_planner() --> inheritance_planner() // update/delete分区表继承表的情况 --> grouping_planner() --> grouping_planner() // 非分区表、继承表的情况 --> preprocess_targetlist(root); // update虽然只更新一列,但是插入一条新元组的时候,需要知道其他列信息. --> rewriteTargetListUD(parse, target_rte, target_relation); --> expand_targetlist() --> query_planner(root, standard_qp_callback, &qp_extra); // 重要 --> add_base_rels_to_query() --> deconstruct_jointree(root); --> add_other_rels_to_query(root); // 展开分区表到PlannerInfo中的相关字段中 --> expand_inherited_rtentry() --> expand_planner_arrays(root, num_live_parts); --> make_one_rel(root, joinlist); --> set_base_rel_sizes(root); --> set_rel_size(); --> set_append_rel_size(root, rel, rti, rte); // 如果是分区表或者继承走这里,否则走下面 --> set_rel_size(root, childrel, childRTindex, childRTE); // 处理子分区表 --> set_plain_rel_size(root, rel, rte); --> set_plain_rel_size() // 如果不是分区表或者继承 --> set_baserel_size_estimates() --> set_base_rel_pathlists(root); --> set_rel_pathlist(root, rel, rti, root->simple_rte_array[rti]); --> set_append_rel_pathlist(root, rel, rti, rte); // 生成各分区表的访问路径 --> make_rel_from_joinlist(root, joinlist);// 动态规划还是基因规划 --> standard_join_search() // 动态规划 --> geqo() // 基因规划与动态规划二选一 --> apply_scanjoin_target_to_paths() --> create_modifytable_path() // 由PlannerInfo---> RelOptInfo --> fetch_upper_rel(root, UPPERREL_FINAL, NULL); // 由RelOptInfo---> Path --> get_cheapest_fractional_path(final_rel, tuple_fraction); // 由 PlannerInfo+Path ---> Plan --> create_plan(root, best_path); // 后续处理,由Plan ---> PlannedStmt
核心数据结构: PlannedStmt、PlannerInfo、RelOptInfo(存储访问路径及其代价)、Path
Path
:所有的路径都继承自 Path
,所以这个比较重要。
typedef struct Path { NodeTag type; NodeTag pathtype; /* tag identifying scan/join method */ RelOptInfo *parent; /* the relation this path can build */ PathTarget *pathtarget; /* list of Vars/Exprs, cost, width */ ParamPathInfo *param_info; /* parameterization info, or NULL if none */ bool parallel_aware; /* engage parallel-aware logic? */ bool parallel_safe; /* OK to use as part of parallel plan? */ int parallel_workers; /* desired # of workers; 0 = not parallel */ /* estimated size/costs for path (see costsize.c for more info) */ double rows; /* estimated number of result tuples */ Cost startup_cost; /* cost expended before fetching any tuples */ Cost total_cost; /* total cost (assuming all tuples fetched) */ List *pathkeys; /* sort ordering of path's output */ /* pathkeys is a List of PathKey nodes; see above */ } Path; /* ModifyTablePath represents performing INSERT/UPDATE/DELETE modifications * We represent most things that will be in the ModifyTable plan node * literally, except we have child Path(s) not Plan(s). But analysis of the * OnConflictExpr is deferred to createplan.c, as is collection of FDW data. */ typedef struct ModifyTablePath { Path path; // 可以看到ModifyTablePath继承自Path CmdType operation; /* INSERT, UPDATE, or DELETE */ bool canSetTag; /* do we set the command tag/es_processed? */ Index nominalRelation; /* Parent RT index for use of EXPLAIN */ Index rootRelation; /* Root RT index, if target is partitioned */ bool partColsUpdated; /* some part key in hierarchy updated */ List *resultRelations; /* integer list of RT indexes */ List *subpaths; /* Path(s) producing source data */ List *subroots; /* per-target-table PlannerInfos */ List *withCheckOptionLists; /* per-target-table WCO lists */ List *returningLists; /* per-target-table RETURNING tlists */ List *rowMarks; /* PlanRowMarks (non-locking only) */ OnConflictExpr *onconflict; /* ON CONFLICT clause, or NULL */ int epqParam; /* ID of Param for EvalPlanQual re-eval */ } ModifyTablePath;
生成update执行路径,最终都是要生成ModifyTablePath,本例中路径生成过程: Path–>ProjectionPath–>ModifyTablePath,也就是先顺序扫描表,再修改表。后面由路径生成执行计划。
/* create_modifytable_path * Creates a pathnode that represents performing INSERT/UPDATE/DELETE mods * * 'rel' is the parent relation associated with the result * 'resultRelations' is an integer list of actual RT indexes of target rel(s) * 'subpaths' is a list of Path(s) producing source data (one per rel) * 'subroots' is a list of PlannerInfo structs (one per rel)*/ ModifyTablePath *create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, CmdType operation, bool canSetTag, Index nominalRelation, Index rootRelation, bool partColsUpdated, List *resultRelations, List *subpaths, List *subroots, List *withCheckOptionLists, List *returningLists, List *rowMarks, OnConflictExpr *onconflict, int epqParam) { ModifyTablePath *pathnode = makeNode(ModifyTablePath); double total_size; ListCell *lc; Assert(list_length(resultRelations) == list_length(subpaths)); Assert(list_length(resultRelations) == list_length(subroots)); Assert(withCheckOptionLists == NIL || list_length(resultRelations) == list_length(withCheckOptionLists)); Assert(returningLists == NIL || list_length(resultRelations) == list_length(returningLists)); pathnode->path.pathtype = T_ModifyTable; pathnode->path.parent = rel; pathnode->path.pathtarget = rel->reltarget; /* pathtarget is not interesting, just make it minimally valid */ /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; pathnode->path.parallel_safe = false; pathnode->path.parallel_workers = 0; pathnode->path.pathkeys = NIL; /** Compute cost & rowcount as sum of subpath costs & rowcounts. * * Currently, we don't charge anything extra for the actual table * modification work, nor for the WITH CHECK OPTIONS or RETURNING * expressions if any. It would only be window dressing, since * ModifyTable is always a top-level node and there is no way for the * costs to change any higher-level planning choices. But we might want * to make it look better sometime.*/ pathnode->path.startup_cost = 0; pathnode->path.total_cost = 0; pathnode->path.rows = 0; total_size = 0; foreach(lc, subpaths) { Path *subpath = (Path *) lfirst(lc); if (lc == list_head(subpaths)) /* first node? */ pathnode->path.startup_cost = subpath->startup_cost; pathnode->path.total_cost += subpath->total_cost; pathnode->path.rows += subpath->rows; total_size += subpath->pathtarget->width * subpath->rows; } /* Set width to the average width of the subpath outputs. XXX this is * totally wrong: we should report zero if no RETURNING, else an average * of the RETURNING tlist widths. But it's what happened historically, * and improving it is a task for another day.*/ if (pathnode->path.rows > 0) total_size /= pathnode->path.rows; pathnode->path.pathtarget->width = rint(total_size); pathnode->operation = operation; pathnode->canSetTag = canSetTag; pathnode->nominalRelation = nominalRelation; pathnode->rootRelation = rootRelation; pathnode->partColsUpdated = partColsUpdated; pathnode->resultRelations = resultRelations; pathnode->subpaths = subpaths; pathnode->subroots = subroots; pathnode->withCheckOptionLists = withCheckOptionLists; pathnode->returningLists = returningLists; pathnode->rowMarks = rowMarks; pathnode->onconflict = onconflict; pathnode->epqParam = epqParam; return pathnode; }
现在我们生成了最优的update路径,需要由路径生成执行计划:
Plan *create_plan(PlannerInfo *root, Path *best_path) { Plan *plan; Assert(root->plan_params == NIL); /* plan_params should not be in use in current query level */ /* Initialize this module's workspace in PlannerInfo */ root->curOuterRels = NULL; root->curOuterParams = NIL; /* Recursively process the path tree, demanding the correct tlist result */ plan = create_plan_recurse(root, best_path, CP_EXACT_TLIST); // 实际实现是在这里 if (!IsA(plan, ModifyTable)) apply_tlist_labeling(plan->targetlist, root->processed_tlist); SS_attach_initplans(root, plan); /* Check we successfully assigned all NestLoopParams to plan nodes */ if (root->curOuterParams != NIL) elog(ERROR, "failed to assign all NestLoopParams to plan nodes"); /** Reset plan_params to ensure param IDs used for nestloop params are not re-used later*/ root->plan_params = NIL; return plan; } // 由最佳路径生成最佳执行计划 static ModifyTable *create_modifytable_plan(PlannerInfo *root, ModifyTablePath *best_path) { ModifyTable *plan; List *subplans = NIL; ListCell *subpaths, *subroots; /* Build the plan for each input path */ forboth(subpaths, best_path->subpaths, subroots, best_path->subroots) { Path *subpath = (Path *) lfirst(subpaths); PlannerInfo *subroot = (PlannerInfo *) lfirst(subroots); Plan *subplan; /* In an inherited UPDATE/DELETE, reference the per-child modified * subroot while creating Plans from Paths for the child rel. This is * a kluge, but otherwise it's too hard to ensure that Plan creation * functions (particularly in FDWs) don't depend on the contents of * "root" matching what they saw at Path creation time. The main * downside is that creation functions for Plans that might appear * below a ModifyTable cannot expect to modify the contents of "root" * and have it "stick" for subsequent processing such as setrefs.c. * That's not great, but it seems better than the alternative.*/ subplan = create_plan_recurse(subroot, subpath, CP_EXACT_TLIST); /* Transfer resname/resjunk labeling, too, to keep executor happy */ apply_tlist_labeling(subplan->targetlist, subroot->processed_tlist); subplans = lappend(subplans, subplan); } plan = make_modifytable(root,best_path->operation,best_path->canSetTag, best_path->nominalRelation,best_path->rootRelation, best_path->partColsUpdated,best_path->resultRelations, subplans,best_path->subroots,best_path->withCheckOptionLists, best_path->returningLists,best_path->rowMarks, best_path->onconflict,best_path->epqParam); copy_generic_path_info(&plan->plan, &best_path->path); return plan; }
最终的执行计划是 ModifyTable
:
/* ---------------- * ModifyTable node - * Apply rows produced by subplan(s) to result table(s), * by inserting, updating, or deleting. * ----------------*/ typedef struct ModifyTable { Plan plan; CmdType operation; /* INSERT, UPDATE, or DELETE */ bool canSetTag; /* do we set the command tag/es_processed? */ Index nominalRelation; /* Parent RT index for use of EXPLAIN */ Index rootRelation; /* Root RT index, if target is partitioned */ bool partColsUpdated; /* some part key in hierarchy updated */ List *resultRelations; /* integer list of RT indexes */ int resultRelIndex; /* index of first resultRel in plan's list */ int rootResultRelIndex; /* index of the partitioned table root */ List *plans; /* plan(s) producing source data */ List *withCheckOptionLists; /* per-target-table WCO lists */ List *returningLists; /* per-target-table RETURNING tlists */ List *fdwPrivLists; /* per-target-table FDW private data lists */ Bitmapset *fdwDirectModifyPlans; /* indices of FDW DM plans */ List *rowMarks; /* PlanRowMarks (non-locking only) */ int epqParam; /* ID of Param for EvalPlanQual re-eval */ OnConflictAction onConflictAction; /* ON CONFLICT action */ List *arbiterIndexes; /* List of ON CONFLICT arbiter index OIDs */ List *onConflictSet; /* SET for INSERT ON CONFLICT DO UPDATE */ Node *onConflictWhere; /* WHERE for ON CONFLICT UPDATE */ Index exclRelRTI; /* RTI of the EXCLUDED pseudo relation */ List *exclRelTlist; /* tlist of the EXCLUDED pseudo relation */ } ModifyTable;
根据上面的执行计划,去执行。主要是各种算子的实现,其中要理解执行器的运行原理,主要是火山模型,一次一元组。我们看一下其调用过程。
CreatePortal("", true, true); PortalDefineQuery(portal,NULL,query_string,commandTag,plantree_list,NULL); PortalStart(portal, NULL, 0, InvalidSnapshot); PortalRun(portal,FETCH_ALL,true,true,receiver,receiver,&qc); --> PortalRunMulti() --> ProcessQuery() --> ExecutorStart(queryDesc, 0); --> standard_ExecutorStart() --> estate = CreateExecutorState(); // 创建EState --> estate->es_output_cid = GetCurrentCommandId(true); // 获得cid,后面更新的时候要用 --> InitPlan(queryDesc, eflags); --> ExecInitNode(plan, estate, eflags); --> ExecInitModifyTable() // 初始化ModifyTableState --> ExecutorRun(queryDesc, ForwardScanDirection, 0L, true); --> standard_ExecutorRun() --> ExecutePlan() --> ExecProcNode(planstate); // 一次一元组 火山模型 --> node->ExecProcNode(node); --> ExecProcNodeFirst(PlanState *node) --> node->ExecProcNode(node); --> ExecModifyTable(PlanState *pstate) --> ExecUpdate() --> table_tuple_update(Relation rel, ......) --> rel->rd_tableam->tuple_update() --> heapam_tuple_update(Relation relation, ......) --> heap_update(relation, otid, tuple, cid, ......) --> ExecutorFinish(queryDesc); --> ExecutorEnd(queryDesc); PortalDrop(portal, false);
关键数据结构:
typedef struct ModifyTableState { PlanState ps; /* its first field is NodeTag */ CmdType operation; /* INSERT, UPDATE, or DELETE */ bool canSetTag; /* do we set the command tag/es_processed? */ bool mt_done; /* are we done? */ PlanState **mt_plans; /* subplans (one per target rel) */ int mt_nplans; /* number of plans in the array */ int mt_whichplan; /* which one is being executed (0..n-1) */ TupleTableSlot **mt_scans; /* input tuple corresponding to underlying * plans */ ResultRelInfo *resultRelInfo; /* per-subplan target relations */ ResultRelInfo *rootResultRelInfo; /* root target relation (partitioned * table root) */ List **mt_arowmarks; /* per-subplan ExecAuxRowMark lists */ EPQState mt_epqstate; /* for evaluating EvalPlanQual rechecks */ bool fireBSTriggers; /* do we need to fire stmt triggers? */ /* Slot for storing tuples in the root partitioned table's rowtype during * an UPDATE of a partitioned table. */ TupleTableSlot *mt_root_tuple_slot; struct PartitionTupleRouting *mt_partition_tuple_routing; /* Tuple-routing support info */ struct TransitionCaptureState *mt_transition_capture; /* controls transition table population for specified operation */ /* controls transition table population for INSERT...ON CONFLICT UPDATE */ struct TransitionCaptureState *mt_oc_transition_capture; /* Per plan map for tuple conversion from child to root */ TupleConversionMap **mt_per_subplan_tupconv_maps; } ModifyTableState;
核心执行算子实现:
static TupleTableSlot *ExecModifyTable(PlanState *pstate) { ModifyTableState *node = castNode(ModifyTableState, pstate); PartitionTupleRouting *proute = node->mt_partition_tuple_routing; EState *estate = node->ps.state; CmdType operation = node->operation; ResultRelInfo *saved_resultRelInfo; ResultRelInfo *resultRelInfo; PlanState *subplanstate; JunkFilter *junkfilter; TupleTableSlot *slot; TupleTableSlot *planSlot; ItemPointer tupleid; ItemPointerData tuple_ctid; HeapTupleData oldtupdata; HeapTuple oldtuple; if (estate->es_epq_active != NULL) elog(ERROR, "ModifyTable should not be called during EvalPlanQual"); if (node->mt_done) return NULL; /* On first call, fire BEFORE STATEMENT triggers before proceeding.*/ if (node->fireBSTriggers) { fireBSTriggers(node); node->fireBSTriggers = false; } /* Preload local variables */ resultRelInfo = node->resultRelInfo + node->mt_whichplan; subplanstate = node->mt_plans[node->mt_whichplan]; junkfilter = resultRelInfo->ri_junkFilter; saved_resultRelInfo = estate->es_result_relation_info; estate->es_result_relation_info = resultRelInfo; /* Fetch rows from subplan(s), and execute the required table modification for each row.*/ for (;;) { /* Reset the per-output-tuple exprcontext. This is needed because * triggers expect to use that context as workspace. It's a bit ugly * to do this below the top level of the plan, however. We might need to rethink this later.*/ ResetPerTupleExprContext(estate); /* Reset per-tuple memory context used for processing on conflict and * returning clauses, to free any expression evaluation storage allocated in the previous cycle. */ if (pstate->ps_ExprContext) ResetExprContext(pstate->ps_ExprContext); planSlot = ExecProcNode(subplanstate); if (TupIsNull(planSlot)) { /* advance to next subplan if any */ node->mt_whichplan++; // 分区表的update,每个分区分布对应一个subplan,当执行完一个分区再执行下一个分区 if (node->mt_whichplan < node->mt_nplans) { resultRelInfo++; subplanstate = node->mt_plans[node->mt_whichplan]; junkfilter = resultRelInfo->ri_junkFilter; estate->es_result_relation_info = resultRelInfo; EvalPlanQualSetPlan(&node->mt_epqstate, subplanstate->plan, node->mt_arowmarks[node->mt_whichplan]); /* Prepare to convert transition tuples from this child. */ if (node->mt_transition_capture != NULL) { node->mt_transition_capture->tcs_map = tupconv_map_for_subplan(node, node->mt_whichplan); } if (node->mt_oc_transition_capture != NULL) { node->mt_oc_transition_capture->tcs_map = tupconv_map_for_subplan(node, node->mt_whichplan); } continue; } else break; } /* Ensure input tuple is the right format for the target relation.*/ if (node->mt_scans[node->mt_whichplan]->tts_ops != planSlot->tts_ops) { ExecCopySlot(node->mt_scans[node->mt_whichplan], planSlot); planSlot = node->mt_scans[node->mt_whichplan]; } /* If resultRelInfo->ri_usesFdwDirectModify is true, all we need to do here is compute the RETURNING expressions.*/ if (resultRelInfo->ri_usesFdwDirectModify) { Assert(resultRelInfo->ri_projectReturning); slot = ExecProcessReturning(resultRelInfo->ri_projectReturning, RelationGetRelid(resultRelInfo->ri_RelationDesc), NULL, planSlot); estate->es_result_relation_info = saved_resultRelInfo; return slot; } EvalPlanQualSetSlot(&node->mt_epqstate, planSlot); slot = planSlot; tupleid = NULL; oldtuple = NULL; if (junkfilter != NULL) { /* extract the 'ctid' or 'wholerow' junk attribute.*/ if (operation == CMD_UPDATE || operation == CMD_DELETE) { char relkind; Datum datum; bool isNull; relkind = resultRelInfo->ri_RelationDesc->rd_rel->relkind; if (relkind == RELKIND_RELATION || relkind == RELKIND_MATVIEW) { datum = ExecGetJunkAttribute(slot,junkfilter->jf_junkAttNo,&isNull); /* shouldn't ever get a null result... */ if (isNull) elog(ERROR, "ctid is NULL"); tupleid = (ItemPointer) DatumGetPointer(datum); tuple_ctid = *tupleid; /* be sure we don't free ctid!! */ tupleid = &tuple_ctid; } /* Use the wholerow attribute, when available, to reconstruct the old relation tuple.*/ else if (AttributeNumberIsValid(junkfilter->jf_junkAttNo)) { datum = ExecGetJunkAttribute(slot,junkfilter->jf_junkAttNo,&isNull); /* shouldn't ever get a null result... */ if (isNull) elog(ERROR, "wholerow is NULL"); oldtupdata.t_data = DatumGetHeapTupleHeader(datum); oldtupdata.t_len = HeapTupleHeaderGetDatumLength(oldtupdata.t_data); ItemPointerSetInvalid(&(oldtupdata.t_self)); /* Historically, view triggers see invalid t_tableOid. */ oldtupdata.t_tableOid = (relkind == RELKIND_VIEW) ? InvalidOid : RelationGetRelid(resultRelInfo->ri_RelationDesc); oldtuple = &oldtupdata; } else Assert(relkind == RELKIND_FOREIGN_TABLE); } /* apply the junkfilter if needed. */ if (operation != CMD_DELETE) slot = ExecFilterJunk(junkfilter, slot); } switch (operation) { case CMD_INSERT: if (proute) /* Prepare for tuple routing if needed. */ slot = ExecPrepareTupleRouting(node, estate, proute, resultRelInfo, slot); slot = ExecInsert(node, slot, planSlot, NULL, estate->es_result_relation_info, estate, node->canSetTag); if (proute) /* Revert ExecPrepareTupleRouting's state change. */ estate->es_result_relation_info = resultRelInfo; break; case CMD_UPDATE: slot = ExecUpdate(node, tupleid, oldtuple, slot, planSlot, &node->mt_epqstate, estate, node->canSetTag); break; case CMD_DELETE: slot = ExecDelete(node, tupleid, oldtuple, planSlot, &node->mt_epqstate, estate, true, node->canSetTag, false /* changingPart */ , NULL, NULL); break; default: elog(ERROR, "unknown operation"); break; } /* If we got a RETURNING result, return it to caller. We'll continue the work on next call.*/ if (slot) { estate->es_result_relation_info = saved_resultRelInfo; return slot; } } estate->es_result_relation_info = saved_resultRelInfo; /* Restore es_result_relation_info before exiting */ fireASTriggers(node); /* We're done, but fire AFTER STATEMENT triggers before exiting.*/ node->mt_done = true; return NULL; }
我们看一下具体执行Update的实现
static TupleTableSlot * ExecUpdate(ModifyTableState *mtstate, ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *slot, TupleTableSlot *planSlot, EPQState *epqstate, EState *estate, bool canSetTag) { ResultRelInfo *resultRelInfo; Relation resultRelationDesc; TM_Result result; TM_FailureData tmfd; List *recheckIndexes = NIL; TupleConversionMap *saved_tcs_map = NULL; /* abort the operation if not running transactions*/ if (IsBootstrapProcessingMode()) elog(ERROR, "cannot UPDATE during bootstrap"); ExecMaterializeSlot(slot); /* get information on the (current) result relation*/ resultRelInfo = estate->es_result_relation_info; resultRelationDesc = resultRelInfo->ri_RelationDesc; /* BEFORE ROW UPDATE Triggers */ if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_update_before_row) { if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo, tupleid, oldtuple, slot)) return NULL; /* "do nothing" */ } /* INSTEAD OF ROW UPDATE Triggers */ if (resultRelInfo->ri_TrigDesc && resultRelInfo->ri_TrigDesc->trig_update_instead_row) { if (!ExecIRUpdateTriggers(estate, resultRelInfo, oldtuple, slot)) return NULL; /* "do nothing" */ } else if (resultRelInfo->ri_FdwRoutine) { /* Compute stored generated columns*/ if (resultRelationDesc->rd_att->constr && resultRelationDesc->rd_att->constr->has_generated_stored) ExecComputeStoredGenerated(estate, slot, CMD_UPDATE); /* update in foreign table: let the FDW do it*/ slot = resultRelInfo->ri_FdwRoutine->ExecForeignUpdate(estate, resultRelInfo, slot, planSlot); if (slot == NULL) /* "do nothing" */ return NULL; /* AFTER ROW Triggers or RETURNING expressions might reference the * tableoid column, so (re-)initialize tts_tableOid before evaluating them. */ slot->tts_tableOid = RelationGetRelid(resultRelationDesc); } else { LockTupleMode lockmode; bool partition_constraint_failed; bool update_indexes; /* Constraints might reference the tableoid column, so (re-)initialize * tts_tableOid before evaluating them.*/ slot->tts_tableOid = RelationGetRelid(resultRelationDesc); /* Compute stored generated columns*/ if (resultRelationDesc->rd_att->constr && resultRelationDesc->rd_att->constr->has_generated_stored) ExecComputeStoredGenerated(estate, slot, CMD_UPDATE); /* * Check any RLS UPDATE WITH CHECK policies * * If we generate a new candidate tuple after EvalPlanQual testing, we * must loop back here and recheck any RLS policies and constraints. * (We don't need to redo triggers, however. If there are any BEFORE * triggers then trigger.c will have done table_tuple_lock to lock the * correct tuple, so there's no need to do them again.) */ lreplace:; /* ensure slot is independent, consider e.g. EPQ */ ExecMaterializeSlot(slot); /* If partition constraint fails, this row might get moved to another * partition, in which case we should check the RLS CHECK policy just * before inserting into the new partition, rather than doing it here. * This is because a trigger on that partition might again change the * row. So skip the WCO checks if the partition constraint fails. */ partition_constraint_failed = resultRelInfo->ri_PartitionCheck && !ExecPartitionCheck(resultRelInfo, slot, estate, false); if (!partition_constraint_failed && resultRelInfo->ri_WithCheckOptions != NIL) { /* ExecWithCheckOptions() will skip any WCOs which are not of the kind we are looking for at this point. */ ExecWithCheckOptions(WCO_RLS_UPDATE_CHECK, resultRelInfo, slot, estate); } /* If a partition check failed, try to move the row into the right partition.*/ if (partition_constraint_failed) { bool tuple_deleted; TupleTableSlot *ret_slot; TupleTableSlot *orig_slot = slot; TupleTableSlot *epqslot = NULL; PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing; int map_index; TupleConversionMap *tupconv_map; /* Disallow an INSERT ON CONFLICT DO UPDATE that causes the * original row to migrate to a different partition. Maybe this * can be implemented some day, but it seems a fringe feature with * little redeeming value.*/ if (((ModifyTable *) mtstate->ps.plan)->onConflictAction == ONCONFLICT_UPDATE) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("invalid ON UPDATE specification"), errdetail("The result tuple would appear in a different partition than the original tuple."))); /* When an UPDATE is run on a leaf partition, we will not have * partition tuple routing set up. In that case, fail with * partition constraint violation error.*/ if (proute == NULL) ExecPartitionCheckEmitError(resultRelInfo, slot, estate); /* Row movement, part 1. Delete the tuple, but skip RETURNING * processing. We want to return rows from INSERT.*/ ExecDelete(mtstate, tupleid, oldtuple, planSlot, epqstate, estate, false, false /* canSetTag */ , true /* changingPart */ , &tuple_deleted, &epqslot); /* For some reason if DELETE didn't happen (e.g. trigger prevented * it, or it was already deleted by self, or it was concurrently * deleted by another transaction), then we should skip the insert * as well; otherwise, an UPDATE could cause an increase in the * total number of rows across all partitions, which is clearly wrong. * * For a normal UPDATE, the case where the tuple has been the * subject of a concurrent UPDATE or DELETE would be handled by * the EvalPlanQual machinery, but for an UPDATE that we've * translated into a DELETE from this partition and an INSERT into * some other partition, that's not available, because CTID chains * can't span relation boundaries. We mimic the semantics to a * limited extent by skipping the INSERT if the DELETE fails to * find a tuple. This ensures that two concurrent attempts to * UPDATE the same tuple at the same time can't turn one tuple * into two, and that an UPDATE of a just-deleted tuple can't resurrect it.*/ if (!tuple_deleted) { /* * epqslot will be typically NULL. But when ExecDelete() * finds that another transaction has concurrently updated the * same row, it re-fetches the row, skips the delete, and * epqslot is set to the re-fetched tuple slot. In that case, * we need to do all the checks again. */ if (TupIsNull(epqslot)) return NULL; else { slot = ExecFilterJunk(resultRelInfo->ri_junkFilter, epqslot); goto lreplace; } } /* Updates set the transition capture map only when a new subplan * is chosen. But for inserts, it is set for each row. So after * INSERT, we need to revert back to the map created for UPDATE; * otherwise the next UPDATE will incorrectly use the one created * for INSERT. So first save the one created for UPDATE. */ if (mtstate->mt_transition_capture) saved_tcs_map = mtstate->mt_transition_capture->tcs_map; /* resultRelInfo is one of the per-subplan resultRelInfos. So we * should convert the tuple into root's tuple descriptor, since * ExecInsert() starts the search from root. The tuple conversion * map list is in the order of mtstate->resultRelInfo[], so to * retrieve the one for this resultRel, we need to know the * position of the resultRel in mtstate->resultRelInfo[]. */ map_index = resultRelInfo - mtstate->resultRelInfo; Assert(map_index >= 0 && map_index < mtstate->mt_nplans); tupconv_map = tupconv_map_for_subplan(mtstate, map_index); if (tupconv_map != NULL) slot = execute_attr_map_slot(tupconv_map->attrMap, slot, mtstate->mt_root_tuple_slot); /* Prepare for tuple routing, making it look like we're inserting into the root. */ Assert(mtstate->rootResultRelInfo != NULL); slot = ExecPrepareTupleRouting(mtstate, estate, proute, mtstate->rootResultRelInfo, slot); ret_slot = ExecInsert(mtstate, slot, planSlot, orig_slot, resultRelInfo, estate, canSetTag); /* Revert ExecPrepareTupleRouting's node change. */ estate->es_result_relation_info = resultRelInfo; if (mtstate->mt_transition_capture) { mtstate->mt_transition_capture->tcs_original_insert_tuple = NULL; mtstate->mt_transition_capture->tcs_map = saved_tcs_map; } return ret_slot; } /* Check the constraints of the tuple. We've already checked the * partition constraint above; however, we must still ensure the tuple * passes all other constraints, so we will call ExecConstraints() and * have it validate all remaining checks.*/ if (resultRelationDesc->rd_att->constr) ExecConstraints(resultRelInfo, slot, estate); /* replace the heap tuple * * Note: if es_crosscheck_snapshot isn't InvalidSnapshot, we check * that the row to be updated is visible to that snapshot, and throw a * can't-serialize error if not. This is a special-case behavior * needed for referential integrity updates in transaction-snapshot mode transactions. */ result = table_tuple_update(resultRelationDesc, tupleid, slot, estate->es_output_cid, estate->es_snapshot, estate->es_crosscheck_snapshot, true /* wait for commit */ ,&tmfd, &lockmode, &update_indexes); switch (result) { case TM_SelfModified: /* The target tuple was already updated or deleted by the * current command, or by a later command in the current * transaction. The former case is possible in a join UPDATE * where multiple tuples join to the same target tuple. This * is pretty questionable, but Postgres has always allowed it: * we just execute the first update action and ignore * additional update attempts. * * The latter case arises if the tuple is modified by a * command in a BEFORE trigger, or perhaps by a command in a * volatile function used in the query. In such situations we * should not ignore the update, but it is equally unsafe to * proceed. We don't want to discard the original UPDATE * while keeping the triggered actions based on it; and we * have no principled way to merge this update with the * previous ones. So throwing an error is the only safe * course. * * If a trigger actually intends this type of interaction, it * can re-execute the UPDATE (assuming it can figure out how) * and then return NULL to cancel the outer update.*/ if (tmfd.cmax != estate->es_output_cid) ereport(ERROR,(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION), errmsg("tuple to be updated was already modified by an operation triggered by the current command"), errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows."))); /* Else, already updated by self; nothing to do */ return NULL; case TM_Ok: break; case TM_Updated: { TupleTableSlot *inputslot; TupleTableSlot *epqslot; if (IsolationUsesXactSnapshot()) ereport(ERROR,(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),errmsg("could not serialize access due to concurrent update"))); /* Already know that we're going to need to do EPQ, so fetch tuple directly into the right slot. */ inputslot = EvalPlanQualSlot(epqstate, resultRelationDesc,resultRelInfo->ri_RangeTableIndex); result = table_tuple_lock(resultRelationDesc, tupleid, estate->es_snapshot,inputslot, estate->es_output_cid, lockmode, LockWaitBlock, TUPLE_LOCK_FLAG_FIND_LAST_VERSION,&tmfd); switch (result) { case TM_Ok: Assert(tmfd.traversed); epqslot = EvalPlanQual(epqstate, resultRelationDesc, resultRelInfo->ri_RangeTableIndex, inputslot); if (TupIsNull(epqslot)) /* Tuple not passing quals anymore, exiting... */ return NULL; slot = ExecFilterJunk(resultRelInfo->ri_junkFilter, epqslot); goto lreplace; case TM_Deleted: /* tuple already deleted; nothing to do */ return NULL; case TM_SelfModified: /* * This can be reached when following an update chain from a tuple updated by another session, * reaching a tuple that was already updated in this transaction. If previously modified by * this command, ignore the redundant update, otherwise error out. * * See also TM_SelfModified response to table_tuple_update() above.*/ if (tmfd.cmax != estate->es_output_cid) ereport(ERROR,(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION), errmsg("tuple to be updated was already modified by an operation triggered by the current command"),errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows."))); return NULL; default: /* see table_tuple_lock call in ExecDelete() */ elog(ERROR, "unexpected table_tuple_lock status: %u", result); return NULL; } } break; case TM_Deleted: if (IsolationUsesXactSnapshot()) ereport(ERROR,(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),errmsg("could not serialize access due to concurrent delete"))); /* tuple already deleted; nothing to do */ return NULL; default: elog(ERROR, "unrecognized table_tuple_update status: %u", result); return NULL; } /* insert index entries for tuple if necessary */ if (resultRelInfo->ri_NumIndices > 0 && update_indexes) recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL, NIL); } if (canSetTag) (estate->es_processed)++; /* AFTER ROW UPDATE Triggers */ ExecARUpdateTriggers(estate, resultRelInfo, tupleid, oldtuple, slot,recheckIndexes,mtstate->operation == CMD_INSERT ?mtstate->mt_oc_transition_capture : mtstate->mt_transition_capture); list_free(recheckIndexes); /* Check any WITH CHECK OPTION constraints from parent views. We are * required to do this after testing all constraints and uniqueness * violations per the SQL spec, so we do it after actually updating the * record in the heap and all indexes. * * ExecWithCheckOptions() will skip any WCOs which are not of the kind we * are looking for at this point. */ if (resultRelInfo->ri_WithCheckOptions != NIL) ExecWithCheckOptions(WCO_VIEW_CHECK, resultRelInfo, slot, estate); if (resultRelInfo->ri_projectReturning) /* Process RETURNING if present */ return ExecProcessReturning(resultRelInfo->ri_projectReturning,RelationGetRelid(resultRelationDesc),slot, planSlot); return NULL; }
再往下就是涉及到存储引擎的部分了,我们重点看一下其对外的接口输入参数。重点是这4个参数:
table_tuple_update
接口之上的。是我们编写ExecUpdate
以及ExecModifyTable
的基础。/* * Update a tuple. * Input parameters: * relation - table to be modified (caller must hold suitable lock) * otid - TID of old tuple to be replaced * slot - newly constructed tuple data to store * cid - update command ID (used for visibility test, and stored into cmax/cmin if successful) * crosscheck - if not InvalidSnapshot, also check old tuple against this * wait - true if should wait for any conflicting update to commit/abort * Output parameters: * tmfd - filled in failure cases (see below) * lockmode - filled with lock mode acquired on tuple * update_indexes - in success cases this is set to true if new index entries are required for this tuple * * Normal, successful return value is TM_Ok, which means we did actually update it. */ static inline TM_Result table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, CommandId cid, Snapshot snapshot, Snapshot crosscheck, bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode, bool *update_indexes) { return rel->rd_tableam->tuple_update(rel, otid, slot, cid, snapshot, crosscheck, wait, tmfd, lockmode, update_indexes); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。