当前位置:   article > 正文

PostgreSQL源码分析——COPY_postgresql的copy命令源码分析

postgresql的copy命令源码分析

导入数据的几种方式

在进行数据导入导出时常会用到copy命令,语法使用可参考下面这篇博文
[Postgres] Bulk Insert and Export Data with csv Files with Postgres copy Command。通常导入数据的方法,可以通过insert的方式(insert into t1 values v1)向表中插入数据,当然这是最慢的方法了,比这个更快的是批量插入,就是每次通过一条insert插入多条数据(insert into t1 values v1,v2,v3,...)。比这个更快的就是copy了。我们来分析一下COPY的实现。

在进行分析copy之前,我们先理解一下为什么批量插入较快?

为什么批量插入比单条插入快

这里postgres中的代码注释写的很明白,另外一点就是在语法解析,语义分析,执行计划生成这块,批量插入相比单条插入也省了很多时间,不用每条插入都执行解析、执行计划生成,只需一次即可,而copy则连计划生成这步都省了,直接批量构造tuple插入表中。

/*
 * Insert multiple tuples into a table.
 *
 * This is like table_tuple_insert(), but inserts multiple tuples in one
 * operation. That's often faster than calling table_tuple_insert() in a loop,
 * because e.g. the AM can reduce WAL logging and page locking overhead.
 */
static inline void
table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots,
				   CommandId cid, int options, struct BulkInsertStateData *bistate)
{
	rel->rd_tableam->multi_insert(rel, slots, nslots, cid, options, bistate);
}

/*
 *	heap_multi_insert	- insert multiple tuples into a heap
 *
 * This is like heap_insert(), but inserts multiple tuples in one operation.
 * That's faster than calling heap_insert() in a loop, because when multiple
 * tuples can be inserted on a single page, we can write just a single WAL
 * record covering all of them, and only need to lock/unlock the page once.
 */
void
heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
				  CommandId cid, int options, BulkInsertState bistate)
                  {
                    // ...
                  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

下面我们继续对copy的源码进行分析

copy源码分析

copy的核心代码在/src/backend/commands/copy.c/src/backend/commands/copyfrom.c/src/backend/commands/copyto.c

主流程如下:

exec_simple_query
--> pg_parse_query
    --> raw_parser
--> pg_analyze_and_rewrite
    --> parse_analyze
        --> transformStmt
--> pg_plan_queries  // 属于utility command, 无需进行查询优化,相比insert快的原因之一,insert需要完整的走解析,优化
--> PortalRun
    --> ProcessUtility
        --> standard_ProcessUtility
            --> DoCopy  // 执行copy语句
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
语法解析层

关键数据结构:

/* ----------------------
 *		Copy Statement
 *
 * We support "COPY relation FROM file", "COPY relation TO file", and
 * "COPY (query) TO file".  In any given CopyStmt, exactly one of "relation"
 * and "query" must be non-NULL.
 * ---------------------- */
typedef struct CopyStmt
{
	NodeTag		type;
	RangeVar   *relation;		/* the relation to copy */  // 最关键的就是表名和文件路径名
	Node	   *query;			/* the query (SELECT or DML statement with RETURNING) to copy, as a raw parse tree */
	List	   *attlist;		/* List of column names (as Strings), or NIL for all columns */
	bool		is_from;		/* TO or FROM */
	bool		is_program;		/* is 'filename' a program to popen? */
	char	   *filename;		/* filename, or NULL for STDIN/STDOUT */
	List	   *options;		/* List of DefElem nodes */
	Node	   *whereClause;	/* WHERE condition (or NULL) */
} CopyStmt;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

gram.y中copy命令的语法定义:

/*****************************************************************************
 *
 *		QUERY :
 *				COPY relname [(columnList)] FROM/TO file [WITH] [(options)]
 *				COPY ( query ) TO file	[WITH] [(options)]
 *
 *				where 'query' can be one of:
 *				{ SELECT | UPDATE | INSERT | DELETE }
 *
 *				and 'file' can be one of:
 *				{ PROGRAM 'command' | STDIN | STDOUT | 'filename' }
 *
 *				In the preferred syntax the options are comma-separated
 *				and use generic identifiers instead of keywords.  The pre-9.0
 *				syntax had a hard-wired, space-separated set of options.
 *
 *				Really old syntax, from versions 7.2 and prior:
 *				COPY [ BINARY ] table FROM/TO file
 *					[ [ USING ] DELIMITERS 'delimiter' ] ]
 *					[ WITH NULL AS 'null string' ]
 *				This option placement is not supported with COPY (query...).
 *
 *****************************************************************************/

CopyStmt:	COPY opt_binary qualified_name opt_column_list
			copy_from opt_program copy_file_name copy_delimiter opt_with
			copy_options where_clause
				{
					CopyStmt *n = makeNode(CopyStmt);
					n->relation = $3;
					n->query = NULL;
					n->attlist = $4;
					n->is_from = $5;
					n->is_program = $6;
					n->filename = $7;
					n->whereClause = $11;

					if (n->is_program && n->filename == NULL)
						ereport(ERROR,
								(errcode(ERRCODE_SYNTAX_ERROR),
								 errmsg("STDIN/STDOUT not allowed with PROGRAM"),
								 parser_errposition(@8)));

					if (!n->is_from && n->whereClause != NULL)
						ereport(ERROR,
								(errcode(ERRCODE_SYNTAX_ERROR),
								 errmsg("WHERE clause not allowed with COPY TO"),
								 parser_errposition(@11)));

					n->options = NIL;
					/* Concatenate user-supplied flags */
					if ($2)
						n->options = lappend(n->options, $2);
					if ($8)
						n->options = lappend(n->options, $8);
					if ($10)
						n->options = list_concat(n->options, $10);
					$$ = (Node *)n;
				}
			| COPY '(' PreparableStmt ')' TO opt_program copy_file_name opt_with copy_options
				{
					CopyStmt *n = makeNode(CopyStmt);
					n->relation = NULL;
					n->query = $3;
					n->attlist = NIL;
					n->is_from = false;
					n->is_program = $6;
					n->filename = $7;
					n->options = $9;

					if (n->is_program && n->filename == NULL)
						ereport(ERROR,
								(errcode(ERRCODE_SYNTAX_ERROR),
								 errmsg("STDIN/STDOUT not allowed with PROGRAM"),
								 parser_errposition(@5)));

					$$ = (Node *)n;
				}
		;

    // 其余省略,可到gram.y中查看 ...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
执行器层

这里只分析copy from, copy to同理。

copy的核心实现就是DoCopy函数,我们进行分析,核心流程如下,可以看到,copy from,就是跳过解析,优化,直接进入执行器,提取文件中的数据,构造tuple,每1000 tuple为一批tuple,然后执行table_multi_insert批量插入。

DoCopy
--> BeginCopyFrom  // Setup to read tuples from a file for COPY FROM.
--> CopyFrom       /* copy from file to database */
    --> CreateExecutorState
    for (;;)
    {
        CopyMultiInsertInfoNextFreeSlot
        -->table_slot_create
        NextCopyFrom  // Directly store the values/nulls array in the slot
        ExecStoreVirtualTuple
        ExecMaterializeSlot
        CopyMultiInsertInfoStore /* Add this tuple to the tuple buffer */
        /* If enough inserts have queued up, then flush all buffers out to their tables. */
		if (CopyMultiInsertInfoIsFull(&multiInsertInfo))  // 批量插入表中,默认值为1000,#define MAX_BUFFERED_TUPLES 1000
			CopyMultiInsertInfoFlush(&multiInsertInfo, resultRelInfo);
            --> CopyMultiInsertBufferFlush
                --> table_multi_insert // 前面的部分实质都是在准备数据,从文件从取数据批量构造tuple,然后批量插入表中
    }
--> EndCopyFrom
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

批量插入的代码后续再进行分析,这里只列出大致的流程。

heap_multi_insert
--> GetCurrentTransactionId
--> RelationNeedsWAL
    for (i = 0; i < ntuples; i++)
    {
        heap_prepare_insert
    }

    while (ndone < ntuples)
	{
        buffer = RelationGetBufferForTuple      // 获取表页中含有空闲空间页的buffer(空闲空间要大于插入tuple的size)
        page = BufferGetPage(buffer);       // buffer中获取指定的页
        RelationPutHeapTuple     // place tuple at specified page
        --> PageAddItemExtended  // Add an item to a page

        MarkBufferDirty(buffer);

        if (needwal)
		{
            XLogBeginInsert();
			XLogRegisterData((char *) xlrec, tupledata - scratch.data);
			XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);

			XLogRegisterBufData(0, tupledata, totaldatalen);

			/* filtering by origin on a row level is much more efficient */
			XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);

			recptr = XLogInsert(RM_HEAP2_ID, info);

			PageSetLSN(page, recptr);
        }
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
本文内容由网友自发贡献,转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/758539
推荐阅读
相关标签
  

闽ICP备14008679号