赞
踩
1 数据导入
导入(Load)功能就是将用户的原始数据导入到 Doris 中。导入成功后,用户即可通过 Mysql 客户端查询数据。为适配不同的数据导入需求,Doris 系统提供了6种不同的导入方式。每种导入方式支持不同的数据源,存在不同的使用方式(异步,同步)。
所有导入方式都支持 csv 数据格式。其中Broker load 还支持parquet和orc 数据格式。
1.1 Broker Load
1.1.1 适用场景
源数据在Broker可以访问的存储系统中,如HDFS。
数据量在几十到百GB级别。
1.1.2 基本原理
用户在提交导入任务后,FE会生成对应的Plan并根据目前BE的个数和文件的大小,将 Plan分给多个BE执行,每个BE执行一部分导入数据。
BE在执行的过程中会从Broker拉取数据,在对数据transform之后将数据导入系统。所有BE均完成导入,由FE最终决定导入是否成功。
1.1.3 基本语法
LOAD LABEL db_name.label_name
(data_desc, ...)
WITH BROKER broker_name broker_properties
[PROPERTIES (key1=value1, ... )]
* data_desc:
DATA INFILE ('file_path', ...)
[NEGATIVE]
INTO TABLE tbl_name
[PARTITION (p1, p2)]
[COLUMNS TERMINATED BY separator ]
[(col1, ...)]
[PRECEDING FILTER predicate]
[SET (k1=f1(xx), k2=f2(xx))]
[WHERE predicate]
* broker_properties:
(key1=value1, ...)
创建导入的详细语法执行 HELP BROKER LOAD 查看语法帮助。这里主要介绍 Broker load 的创建导入语法中参数意义和注意事项。
导入作业参数主要指的是 Broker load 创建导入语句中的属于 opt_properties部分的参数。导入作业参数是作用于整个导入作业的。
下面主要对导入作业参数的部分参数详细解释:
1.1.4 导入示例
1.1.5 查看导入
Broker load 导入方式由于是异步的,所以用户必须将创建导入的 Label 记录,并且在查看导入命令中使用 Label 来查看导入结果。查看导入命令在所有导入方式中是通用的,具体语法可执行 HELP SHOW LOAD 查看_大数据培训。
mysql> show load order by createtime desc limit 1\G
*************************** 1. row ***************************
JobId: 76391
Label: label1
State: FINISHED
Progress: ETL:N/A; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
ErrorMsg: N/A
CreateTime: 2019-07-27 11:46:42
EtlStartTime: 2019-07-27 11:46:44
EtlFinishTime: 2019-07-27 11:46:44
LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
URL: http://192.168.1.1:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e2725415317
JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"FileNumber":1,"FileSize":1073741824}
下面主要介绍了查看导入命令返回结果集中参数意义:
1.1.6 取消导入
当 Broker load 作业状态不为 CANCELLED 或 FINISHED 时,可以被用户手动取消。取消时需要指定待取消导入任务的 Label 。取消导入命令语法可执行 HELP CANCEL LOAD查看。
CANCEL LOAD
[FROM db_name]
WHERE LABEL=”load_label”;
1.2 Stream Load
Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中。Stream load 同步执行导入并返回导入结果。用户可直接通过请求的返回体判断本次导入是否成功。
1.2.1 适用场景
Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。
目前Stream Load支持两个数据格式:CSV(文本)和JSON。
1.2.2 基本原理
下图展示了Stream load 的主要流程,省略了一些导入细节。
Stream load 中,Doris 会选定一个节点作为 Coordinator 节点。该节点负责接数据并分发数据到其他数据节点。
用户通过 HTTP 协议提交导入命令。如果提交到 FE,则 FE 会通过 HTTP redirect 指令将请求转发给某一个 BE。用户也可以直接提交导入命令给某一指定 BE。
导入的最终结果由 Coordinator BE 返回给用户。
1.2.3 基本语法
Stream load 通过 HTTP 协议提交和传输数据。这里通过 curl 命令展示如何提交导入。
用户也可以通过其他 HTTP client 进行操作。
curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load
Header中支持属性见下面的 ‘导入任务参数’说明。
格式为:-H "key1:value1"
创建导入的详细语法帮助执行HELP STREAM LOAD查看, 下面主要介绍创建Stream load的部分参数意义。
user/passwd
Stream load 由于创建导入的协议使用的是 HTTP 协议,通过 Basic access authentication 进行签名。Doris 系统会根据签名验证用户身份和导入权限。
导入任务参数Stream load 由于使用的是 HTTP 协议,所以所有导入任务有关的参数均设置在 Header 中。下面主要介绍了 Stream load 导入任务参数的部分参数意义。
Label
导入任务的标识
column_separator用于指定导入文件中的列分隔符,默认为\t。如果是不可见字符,则需要加\x作为前缀,使用十六进制来表示分隔符。如hive文件的分隔符\x01,需要指定为-H "column_separator:\x01"。可以使用多个字符的组合作为列分隔符。
line_delimiter用于指定导入文件中的换行符,默认为\n。可以使用做多个字符的组合作为换行符。
max_filter_ratio导入任务的最大容忍率
where导入任务指定的过滤条件。Stream load 支持对原始数据指定 where 语句进行过滤。被过滤的数据将不会被导入,也不会参与 filter ratio 的计算,但会被计入num_rows_unselected。
partition待导入表的 Partition 信息,如果待导入数据不属于指定的 Partition 则不会被导入。这些数据将计入 dpp.abnorm.ALL
columns待导入数据的函数变换配置,目前 Stream load 支持的函数变换方法包含列的顺序变化以及表达式变换,其中表达式变换的方法与查询语句的一致。
列顺序变换例子:原始数据有三列(src_c1,src_c2,src_c3), 目前doris表也有三列(dst_c1,dst_c2,dst_c3)如果原始表的src_c1列对应目标表dst_c1列,原始表的src_c2列对应目标表dst_c2列,原始表的src_c3列对应目标表dst_c3列,则写法如下:columns: dst_c1, dst_c2, dst_c3如果原始表的src_c1列对应目标表dst_c2列,原始表的src_c2列对应目标表dst_c3列,原始表的src_c3列对应目标表dst_c1列,则写法如下:columns: dst_c2, dst_c3, dst_c1表达式变换例子:原始文件有两列,目标表也有两列(c1,c2)但是原始文件的两列均需要经过函数变换才能对应目标表的两列,则写法如下:columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = month(tmp_c2)其中 tmp_*是一个占位符,代表的是原始文件中的两个原始列。
exec_mem_limit导入内存限制。默认为 2GB,单位为字节。
strict_mode
two_phase_commitStream load 导入可以开启两阶段事务提交模式。开启方式为在 HEADER 中声明 two_phase_commit=true 。
默认的两阶段批量事务提交为关闭。两阶段批量事务提交模式的意思是:Stream load过程中,数据写入完成即会返回信息给用户,此时数据不可见,事务状态为PRECOMMITTED,用户手动触发commit操作之后,数据才可见。
用户可以调用如下接口对stream load事务触发commit操作:curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:commit" http://fe_host:http_port/api/{db}/_stream_load_2pc
或
curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:commit" http://be_host:webserver_port/api/{db}/_stream_load_2pc
用户可以调用如下接口对stream load事务触发abort操作:curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:abort" http://fe_host:http_port/api/{db}/_stream_load_2pc
或
curl -X PUT --location-trusted -u user:passwd -H "txn_id:txnId" -H "txn_operation:abort" http://be_host:webserver_port/api/{db}/_stream_load_2pc
1.2.4 导入示例
curl --location-trusted -u root -H "label:123" -H"column_separator:," -T student.csv -X PUT http://hadoop1:8030/api/test_db/student_result/_stream_load
由于 Stream load 是一种同步的导入方式,所以导入的结果会通过创建导入的返回值直接返回给用户。
注意:由于 Stream load 是同步的导入方式,所以并不会在 Doris 系统中记录导入信息,用户无法异步的通过查看导入命令看到 Stream load。使用时需监听创建导入请求的返回值获取导入结果。
1.2.5 取消导入
用户无法手动取消 Stream load,Stream load 在超时或者导入错误后会被系统自动取消。
Stream Load是一个同步的导入方式,用户通过发送HTTP协议将本地文件或数据流导入到Doris中,Stream load同步执行导入并返回结果。用户可以直接通过返回判断导入是否成功。
1.3 Routine Load
例行导入(Routine Load)功能为用户提供了一种自动从指定数据源进行数据导入的功能。
1.3.1 适用场景
当前仅支持从Kafka系统进行例行导入,使用限制:
1.3.2 基本原理
如上图,Client 向 FE 提交一个例行导入作业。
1.3.3 基本语法
CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]
执行 HELP ROUTINE LOAD 可以查看语法帮助,下面是参数说明
1.3.4 Kafka 导入示例
1.3.5 查看导入作业状态
查看作业状态的具体命令和示例可以通过 HELP SHOW ROUTINE LOAD; 命令查看。
查看任务运行状态的具体命令和示例可以通过 HELP SHOW ROUTINE LOAD TASK; 命令查看。
只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。
1.3.6 修改作业属性
用户可以修改已经创建的作业。具体说明可以通过 HELP ALTER ROUTINE LOAD; 命令查看。或参阅ALTER ROUTINE LOAD。
1.3.7 作业控制
用户可以通过 STOP/PAUSE/RESUME三个命令来控制作业的停止,暂停和重启。可以通过HELP STOP ROUTINE LOAD; HELP PAUSE ROUTINE LOAD; 以及 HELP RESUME ROUTINE LOAD; 三个命令查看帮助和示例。
1.3.8 其他说明
1.例行导入作业和 ALTER TABLE 操作的关系例行导入不会阻塞 SCHEMA CHANGE 和 ROLLUP 操作。但是注意如果 SCHEMA CHANGE 完成后,列映射关系无法匹配,则会导致作业的错误数据激增,最终导致作业暂停。建议通过在例行导入作业中显式指定列映射关系,以及通过增加 Nullable 列或带 Default 值的列来减少这类问题。删除表的 Partition 可能会导致导入数据无法找到对应的 Partition,作业进入暂停。
2.例行导入作业和其他导入作业的关系(LOAD, DELETE, INSERT)例行导入和其他 LOAD 作业以及 INSERT 操作没有冲突。当执行 DELETE 操作时,对应表分区不能有任何正在执行的导入任务。所以在执行 DELETE 操作前,可能需要先暂停例行导入作业,并等待已下发的 task 全部完成后,才可以执行 DELETE。
3.例行导入作业和 DROP DATABASE/TABLE 操作的关系当例行导入对应的 database 或 table 被删除后,作业会自动 CANCEL。
4.kafka 类型的例行导入作业和 kafka topic 的关系当用户在创建例行导入声明的 kafka_topic 在kafka集群中不存在时:所以,如果用户希望当 kafka topic 不存在的时候,被例行作业自动创建的话,只需要将用户方的kafka集群中的 broker 设置 auto.create.topics.enable = true 即可。
5.在网络隔离的环境中可能出现的问题在有些环境中存在网段和域名解析的隔离措施,所以需要注意:
6.关于指定消费的 Partition 和 Offset
Doris支持指定Partition和Offset开始消费。新版中还支持了指定时间点进行消费的功能。这里说明下对应参数的配置关系。
有三个相关参数:
在创建导入作业时,这三个参数可以有以下组合:
7.STOP和PAUSE的区别
FE会自动定期清理STOP状态的ROUTINE LOAD,而PAUSE状态的则可以再次被恢复启用。
1.4 Binlog Load
Binlog Load提供了一种使Doris增量同步用户在Mysql数据库的对数据更新操作的CDC(Change Data Capture)功能。
1.4.1 适用场景
INSERT/UPDATE/DELETE支持。
过滤Query。
暂不兼容DDL语句。
1.4.2 基本原理
在第一期的设计中,Binlog Load需要依赖canal作为中间媒介,让canal伪造成一个从节点去获取Mysql主节点上的Binlog并解析,再由Doris去获取Canal上解析好的数据,主要涉及Mysql端、Canal端以及Doris端,总体数据流向如下:
如上图,用户向FE提交一个数据同步作业。
1.4.3 配置MySQL端
在MySQL Cluster模式的主从同步中,二进制日志文件(Binlog)记录了主节点上的所有数据变化,数据在Cluster的多个节点间同步、备份都要通过Binlog日志进行,从而提高集群的可用性。架构通常由一个主节点(负责写)和一个或多个从节点(负责读)构成,所有在主节点上发生的数据变更将会复制给从节点。
注意:目前必须要使用Mysql 5.7及以上的版本才能支持Binlog Load功能。
1.4.4 配置Canal端
Canal是属于阿里巴巴otter项目下的一个子项目,主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费,用于解决跨机房同步的业务场景,建议使用canal 1.1.5及以上版本。
下载地址:https://github.com/alibaba/canal/releases
1.4.5 配置目标表
1.4.6 基本语法
创建数据同步作业的的详细语法可以连接到 Doris 后,执行 HELP CREATE SYNC JOB; 查看语法帮助。
CREATE SYNC [db.]job_name
(
channel_desc,
channel_desc
...
)
binlog_desc
1.4.7 示例
作业提交之后状态为PENDING,由FE调度执行启动canal client后状态变成RUNNING,用户可以通过 STOP/PAUSE/RESUME 三个命令来控制作业的停止,暂停和恢复,操作后作业状态分别为CANCELLED/PAUSED/RUNNING。作业的最终阶段只有一个CANCELLED,当作业状态变为CANCELLED后,将无法再次恢复。当作业发生了错误时,若错误是不可恢复的,状态会变成CANCELLED,否则会变成PAUSED。
1.5 Insert Into
Insert Into 语句的使用方式和 MySQL 等数据库中 Insert Into 语句的使用方式类似。但在 Doris 中,所有的数据写入都是一个独立的导入作业。所以这里将 Insert Into 也作为一种导入方式介绍。
主要的 Insert Into 命令包含以下两种:
INSERT INTO tbl SELECT ...
INSERT INTO tbl (col1, col2, ...) VALUES (1, 2, ...), (1,3, ...);
其中第二种命令仅用于 Demo,不要使用在测试或生产环境中。
Insert Into 命令需要通过 MySQL 协议提交,创建导入请求会同步返回导入结果。
1.5.1 语法
INSERT INTO table_name [partition_info] [WITH LABEL label] [col_list] [query_stmt] [VALUES];
WITH LABEL:
INSERT 操作作为一个导入任务,也可以指定一个 label。如果不指定,则系统会自动指定一个 UUID 作为 label。
该功能需要 0.11+ 版本。
注意:建议指定 Label 而不是由系统自动分配。如果由系统自动分配,但在 Insert Into 语句执行过程中,因网络错误导致连接断开等,则无法得知 Insert Into 是否成功。而如果指定 Label,则可以再次通过 Label 查看任务结果。
示例:
INSERT INTO tbl2 WITH LABEL label1 SELECT * FROM tbl3;
INSERT INTO tbl1 VALUES ("qweasdzxcqweasdzxc"), ("a");
注意:当需要使用 CTE(Common Table Expressions) 作为 insert 操作中的查询部分时,必须指定 WITH LABEL 和 column list 部分。
示例:
INSERT INTO tbl1 WITH LABEL label1
WITH cte1 AS (SELECT * FROM tbl1), cte2 AS (SELECT * FROM tbl2)
SELECT k1 FROM cte1 JOIN cte2 WHERE cte1.k1 = 1;
INSERT INTO tbl1 (k1)
WITH cte1 AS (SELECT * FROM tbl1), cte2 AS (SELECT * FROM tbl2)
SELECT k1 FROM cte1 JOIN cte2 WHERE cte1.k1 = 1;
1.5.2 SHOW LAST INSERT
一些语言的MySQL类库中很难获取返回结果的中的json字符串。因此,Doris 还提供了 SHOW LAST INSERT 命令来显式的获取最近一次 insert 操作的结果。
当执行完一个insert操作后,可以在同一session连接中执行SHOW LAST INSERT。该命令会返回最近一次insert 操作的结果,如:
mysql> show last insert\G
*************************** 1. row ***************************
TransactionId: 64067
Label: insert_ba8f33aea9544866-8ed77e2844d0cc9b
Database: default_cluster:db1
Table: t1
TransactionStatus: VISIBLE
LoadedRows: 2
FilteredRows: 0
该命令会返回insert以及对应事务的详细信息。因此,用户可以在每次执行完insert操作后,继续执行show last insert 命令来获取insert的结果。
注意:该命令只会返回在同一session连接中,最近一次insert操作的结果。如果连接断开或更换了新的连接,则将返回空集。
1.6 S3 Load
参考官网:https://doris.apache.org/zh-CN/administrator-guide/load-data/s3-load-manual.html
2 数据导出
2.1 Export导出
数据导出是Doris提供的一种将数据导出的功能。该功能可以将用户指定的表或分区的数据以文本的格式,通过Broker进程导出到远端存储上,如HDFS/BOS等。
2.1.1 基本原理
用户提交一个 Export 作业后。Doris 会统计这个作业涉及的所有 Tablet。然后对这些 Tablet 进行分组,每组生成一个特殊的查询计划。该查询计划会读取所包含的 Tablet 上的数据,然后通过 Broker 将数据写到远端存储指定的路径中,也可以通过S3协议直接导出到支持S3协议的远端存储上。
Export 作业会生成多个查询计划,每个查询计划负责扫描一部分 Tablet。每个查询计划扫描的 Tablet 个数由 FE 配置参数 export_tablet_num_per_task 指定,默认为 5。即假设一共 100 个 Tablet,则会生成 20 个查询计划。用户也可以在提交作业时,通过作业属性 tablet_num_per_task 指定这个数值。
2.1.2 基本语法
Export 的详细命令可以通过 HELP EXPORT查看:
EXPORT TABLE db1.tbl1
PARTITION (p1,p2)
[WHERE [expr]]
TO "hdfs://host/path/to/export/"
PROPERTIES
(
"label" = "mylabel",
"column_separator"=",",
"columns" = "col1,col2",
"exec_mem_limit"="2147483648",
"timeout" = "3600"
)
WITH BROKER "hdfs"
(
"username" = "user",
"password" = "passwd"
);
2.1.3 导出示例
2.1.4 查询导出作业状态
提交作业后,可以通过 SHOW EXPORT 命令查询导出作业状态。结果举例如下:
JobId: 14008
Label: mylabel
State: FINISHED
Progress: 100%
TaskInfo: {"partitions":["*"],"exec mem limit":2147483648,"column separator":",","line delimiter":"\n","tablet num":1,"broker":"hdfs","coord num":1,"db":"default_cluster:db1","tbl":"tbl3"}
Path: bos://bj-test-cmy/export/
CreateTime: 2019-06-25 17:08:24
StartTime: 2019-06-25 17:08:28
FinishTime: 2019-06-25 17:08:34
Timeout: 3600
ErrorMsg: N/A
2.1.5 注意事项
2.2 查询结果导出
SELECT INTO OUTFILE 语句可以将查询结果导出到文件中。目前支持通过 Broker 进程, 通过 S3 协议, 或直接通过 HDFS 协议,导出到远端存储,如 HDFS,S3,BOS,COS(腾讯云)上。
2.2.1 语法
语法如下
query_stmt
INTO OUTFILE "file_path"
[format_as]
[properties]
2.2.2 并发导出
2.2.3 使用示例
文章转载来源于大数据那些事,作者尚硅谷陈老师
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。