赞
踩
在进行数据导入导出时常会用到copy命令,语法使用可参考下面这篇博文
[Postgres] Bulk Insert and Export Data with csv Files with Postgres copy Command。通常导入数据的方法,可以通过insert
的方式(insert into t1 values v1
)向表中插入数据,当然这是最慢的方法了,比这个更快的是批量插入,就是每次通过一条insert插入多条数据(insert into t1 values v1,v2,v3,...)
。比这个更快的就是copy了。我们来分析一下COPY的实现。
在进行分析copy之前,我们先理解一下为什么批量插入较快?
这里postgres中的代码注释写的很明白,另外一点就是在语法解析,语义分析,执行计划生成这块,批量插入相比单条插入也省了很多时间,不用每条插入都执行解析、执行计划生成,只需一次即可,而copy则连计划生成这步都省了,直接批量构造tuple插入表中。
/* * Insert multiple tuples into a table. * * This is like table_tuple_insert(), but inserts multiple tuples in one * operation. That's often faster than calling table_tuple_insert() in a loop, * because e.g. the AM can reduce WAL logging and page locking overhead. */ static inline void table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, CommandId cid, int options, struct BulkInsertStateData *bistate) { rel->rd_tableam->multi_insert(rel, slots, nslots, cid, options, bistate); } /* * heap_multi_insert - insert multiple tuples into a heap * * This is like heap_insert(), but inserts multiple tuples in one operation. * That's faster than calling heap_insert() in a loop, because when multiple * tuples can be inserted on a single page, we can write just a single WAL * record covering all of them, and only need to lock/unlock the page once. */ void heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate) { // ... }
下面我们继续对copy的源码进行分析
copy的核心代码在/src/backend/commands/copy.c
、/src/backend/commands/copyfrom.c
、/src/backend/commands/copyto.c
中
主流程如下:
exec_simple_query
--> pg_parse_query
--> raw_parser
--> pg_analyze_and_rewrite
--> parse_analyze
--> transformStmt
--> pg_plan_queries // 属于utility command, 无需进行查询优化,相比insert快的原因之一,insert需要完整的走解析,优化
--> PortalRun
--> ProcessUtility
--> standard_ProcessUtility
--> DoCopy // 执行copy语句
关键数据结构:
/* ---------------------- * Copy Statement * * We support "COPY relation FROM file", "COPY relation TO file", and * "COPY (query) TO file". In any given CopyStmt, exactly one of "relation" * and "query" must be non-NULL. * ---------------------- */ typedef struct CopyStmt { NodeTag type; RangeVar *relation; /* the relation to copy */ // 最关键的就是表名和文件路径名 Node *query; /* the query (SELECT or DML statement with RETURNING) to copy, as a raw parse tree */ List *attlist; /* List of column names (as Strings), or NIL for all columns */ bool is_from; /* TO or FROM */ bool is_program; /* is 'filename' a program to popen? */ char *filename; /* filename, or NULL for STDIN/STDOUT */ List *options; /* List of DefElem nodes */ Node *whereClause; /* WHERE condition (or NULL) */ } CopyStmt;
gram.y中copy命令的语法定义:
/***************************************************************************** * * QUERY : * COPY relname [(columnList)] FROM/TO file [WITH] [(options)] * COPY ( query ) TO file [WITH] [(options)] * * where 'query' can be one of: * { SELECT | UPDATE | INSERT | DELETE } * * and 'file' can be one of: * { PROGRAM 'command' | STDIN | STDOUT | 'filename' } * * In the preferred syntax the options are comma-separated * and use generic identifiers instead of keywords. The pre-9.0 * syntax had a hard-wired, space-separated set of options. * * Really old syntax, from versions 7.2 and prior: * COPY [ BINARY ] table FROM/TO file * [ [ USING ] DELIMITERS 'delimiter' ] ] * [ WITH NULL AS 'null string' ] * This option placement is not supported with COPY (query...). * *****************************************************************************/ CopyStmt: COPY opt_binary qualified_name opt_column_list copy_from opt_program copy_file_name copy_delimiter opt_with copy_options where_clause { CopyStmt *n = makeNode(CopyStmt); n->relation = $3; n->query = NULL; n->attlist = $4; n->is_from = $5; n->is_program = $6; n->filename = $7; n->whereClause = $11; if (n->is_program && n->filename == NULL) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("STDIN/STDOUT not allowed with PROGRAM"), parser_errposition(@8))); if (!n->is_from && n->whereClause != NULL) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("WHERE clause not allowed with COPY TO"), parser_errposition(@11))); n->options = NIL; /* Concatenate user-supplied flags */ if ($2) n->options = lappend(n->options, $2); if ($8) n->options = lappend(n->options, $8); if ($10) n->options = list_concat(n->options, $10); $$ = (Node *)n; } | COPY '(' PreparableStmt ')' TO opt_program copy_file_name opt_with copy_options { CopyStmt *n = makeNode(CopyStmt); n->relation = NULL; n->query = $3; n->attlist = NIL; n->is_from = false; n->is_program = $6; n->filename = $7; n->options = $9; if (n->is_program && n->filename == NULL) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("STDIN/STDOUT not allowed with PROGRAM"), parser_errposition(@5))); $$ = (Node *)n; } ; // 其余省略,可到gram.y中查看 ...
这里只分析copy from, copy to同理。
copy的核心实现就是DoCopy
函数,我们进行分析,核心流程如下,可以看到,copy from,就是跳过解析,优化,直接进入执行器,提取文件中的数据,构造tuple,每1000 tuple为一批tuple,然后执行table_multi_insert批量插入。
DoCopy --> BeginCopyFrom // Setup to read tuples from a file for COPY FROM. --> CopyFrom /* copy from file to database */ --> CreateExecutorState for (;;) { CopyMultiInsertInfoNextFreeSlot -->table_slot_create NextCopyFrom // Directly store the values/nulls array in the slot ExecStoreVirtualTuple ExecMaterializeSlot CopyMultiInsertInfoStore /* Add this tuple to the tuple buffer */ /* If enough inserts have queued up, then flush all buffers out to their tables. */ if (CopyMultiInsertInfoIsFull(&multiInsertInfo)) // 批量插入表中,默认值为1000,#define MAX_BUFFERED_TUPLES 1000 CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo); --> CopyMultiInsertBufferFlush --> table_multi_insert // 前面的部分实质都是在准备数据,从文件从取数据批量构造tuple,然后批量插入表中 } --> EndCopyFrom
批量插入的代码后续再进行分析,这里只列出大致的流程。
heap_multi_insert --> GetCurrentTransactionId --> RelationNeedsWAL for (i = 0; i < ntuples; i++) { heap_prepare_insert } while (ndone < ntuples) { buffer = RelationGetBufferForTuple // 获取表页中含有空闲空间页的buffer(空闲空间要大于插入tuple的size) page = BufferGetPage(buffer); // buffer中获取指定的页 RelationPutHeapTuple // place tuple at specified page --> PageAddItemExtended // Add an item to a page MarkBufferDirty(buffer); if (needwal) { XLogBeginInsert(); XLogRegisterData((char *) xlrec, tupledata - scratch.data); XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags); XLogRegisterBufData(0, tupledata, totaldatalen); /* filtering by origin on a row level is much more efficient */ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); recptr = XLogInsert(RM_HEAP2_ID, info); PageSetLSN(page, recptr); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。