赞
踩
Apache Doris 代码仓库地址:apache/incubator-doris 欢迎大家关注加星
- CREATE TABLE `t_pro_dish_list_detail_test` (
- `order_time` date,
- `shop_id` varchar(32) ,
- `id` varchar(32) ,
- `table_bill_id` varchar(36) ,
- `shop_name` varchar(100) ,
- `dish_type` int,
- `dish_id` varchar(50) ,
- `dish_name` varchar(100) ,
- `standard_code` varchar(32) ,
- `standard_id` varchar(100) ,
- `dish_price` decimal(16,2),
- `served_quantity` int(11),
- `dish_abnormal_status` varchar(20),
- `ts` varchar(20),
- `taste_type_id` varchar(255),
- `taste_name` varchar(255)
- )
- DUPLICATE KEY(order_time)
- PARTITION BY RANGE(order_time) (
- PARTITION P_202010 VALUES [("2020-10-01"),("2020-11-01")),
- PARTITION P_202011 VALUES [("2020-11-01"),("2020-12-01"))
- )
- DISTRIBUTED BY HASH(shop_id) BUCKETS 8
- PROPERTIES(
- "replication_num" = "2",
- "dynamic_partition.enable" = "true",
- "dynamic_partition.time_unit" = "MONTH",
- "dynamic_partition.start" = "-2147483648",
- "dynamic_partition.end" = "2",
- "dynamic_partition.prefix" = "P_",
- "dynamic_partition.buckets" = "8",
- "dynamic_partition.time_zone" = "Asia/Shanghai"
- );
- 说明:
- replication_num : 副本数
- dynamic_partition.enable: 是否开启动态分区
- dynamic_partition.time_unit: 动态分区调度的单位(HOUR、DAY、WEEK、MONTH)
- 当指定为 HOUR 时,动态创建的分区名后缀格式为 yyyyMMddHH,例如2020032501
- 当指定为 DAY 时,动态创建的分区名后缀格式为 yyyyMMdd,例如20200325
- 当指定为 WEEK 时,动态创建的分区名后缀格式为yyyy_ww。即当前日期属于这一年的第几周,例如 2020- 03-25 创建的分区名后缀为 2020_13, 表明目前为2020年第13周
- 当指定为 MONTH 时,动态创建的分区名后缀格式为 yyyyMM,例如 202003
- dynamic_partition.start: 动态分区的起始偏移,为负数根据 time_unit 属性的不同,以当天(星期/月) 为基准,分区范围在此偏移之前的分区将会被删除。如果不填写,则默认为 -2147483648,即不删除历史 分区
- dynamic_partition.end:动态分区的结束偏移,为正数。根据 time_unit 属性的不同,以当天(星期/月)为基准, 提前创建对应范围的分区
- dynamic_partition.prefix:动态创建的分区名前缀。
- dynamic_partition.buckets:动态创建的分区所对应的分桶数量
- dynamic_partition.time_zone:动态分区的时区
-
- CREATE TABLE `t_pro_dish_list_detail_test_demo` (
- `create_time` date,
- `pro_id` varchar(255),
- `id` int(11)
- )
- DUPLICATE KEY(create_time)
- PARTITION BY RANGE(create_time) (
- PARTITION P_202011 VALUES [("2020-11-01"),("2020-12-01"))
- )
- DISTRIBUTED BY HASH(pro_id) BUCKETS 2
- PROPERTIES(
- "replication_num" = "2",
- "dynamic_partition.enable" = "true",
- "dynamic_partition.time_unit" = "MONTH",
- "dynamic_partition.start" = "-2147483648",
- "dynamic_partition.end" = "2",
- "dynamic_partition.prefix" = "P_",
- "dynamic_partition.buckets" = "2"
- );

- curl -i -v --location-trusted -u root: -T /root/test/test.csv http://10.220.147.155:8030/api/demo/t_pro_dish_list_detail_test_demo/_load?label=2020-092232-01&column_separator=%2c
- 说明:
- label 唯一标识
- column_separator: 用于指定列与列之间的分隔符,默认的为'\t'
- NOTE: 需要进行url编码,譬如
- 需要指定'\t'为分隔符,那么应该传入'column_separator=%09'
- 需要指定'\x01'为分隔符,那么应该传入'column_separator=%01'
- 需要指定','为分隔符,那么应该传入'column_separator=%2c'
- 1.show load可以查看导入信息,及错误信息
- 2.页面system->jobs->操作的数据库名称(DbNAME)->load查看导入信息,及错误信息
- Reason: actual column number is less than schema column number. actual number: 1 sep: , schema number: 3;
- 原因:分割符没有生效,临时跳转的时候column_separator=%2c这个参数被丢弃了
- 解决方法:
- curl -i -v --location-trusted -u root: -T /root/test/test.csv http://10.220.147.155:8030/api/demo/t_pro_dish_list_detail_test_demo/_load?label=2020-092232-01\&column_separator=%2c
- 加转译符"\"
- 1.Client 向 FE 提交一个例行导入作业。
- 2.FE 通过 JobScheduler 将一个导入作业拆分成若干个 Task。每个 Task 负责导入指定的一部分数据。Task 被 3.TaskScheduler 分配到指定的 BE 上执行。
- 4.在 BE 上,一个 Task 被视为一个普通的导入任务,通过 Stream Load 的导入机制进行导入。导入完成后,向 FE 汇报。
- 5.FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或者对失败的 Task 进行重试。
- 整个例行导入作业通过不断的产生新的 Task,来完成数据不间断的导入
- 1.功能:
- 支持用户提交一个常驻的导入任务,通过不断的从指定的数据源读取数据,将数据导入到 Doris 中。
- 目前仅支持通过无认证或者 SSL 认证方式,从 Kakfa 导入文本格式(CSV)的数据。
- 2.语法:
- CREATE ROUTINE LOAD [db.]job_name ON tbl_name
- [merge_type]
- [load_properties]
- [job_properties]
- FROM data_source
- [data_source_properties]
- 说明:
- 1. [db.]job_name
- 导入作业的名称,在同一个 database 内,相同名称只能有一个 job 在运行。
- 2. tbl_name
- 指定需要导入的表的名称。
- 3. merge_type
- 数据的合并类型,一共支持三种类型APPEND、DELETE、MERGE 其中,APPEND是默认值,表示这批数据全部需要追加到现有数据中,DELETE 表示删除与这批数据key相同的所有行,MERGE 语义 需要与delete on条件联合使用,表示满足delete 条件的数据按照DELETE 语义处理其余的按照APPEND 语义处理, 语法为[WITH MERGE|APPEND|DELETE]
- 4. load_properties
- 用于描述导入数据。语法:
- [column_separator],
- [columns_mapping],
- [where_predicates],
- [delete_on_predicates],
- [source_sequence],
- [partitions]
- 1. column_separator:
- 指定列分隔符,如:COLUMNS TERMINATED BY ",",默认为:\t
- 2. columns_mapping: 指定源数据中列的映射关系,以及定义衍生列的生成方式。
- 1. 映射列:按顺序指定,源数据中各个列,对应目的表中的哪些列。对于希望跳过的列,可以指定一个不存在的列名。假设目的表有三列 k1, k2, v1。源数据有4列,其中第1、2、4列分别对应 k2, k1, v1。则书写如下:
- COLUMNS (k2, k1, xxx, v1)
- 其中 xxx 为不存在的一列,用于跳过源数据中的第三列。
- 2. 衍生列:以 col_name = expr 的形式表示的列,我们称为衍生列。即支持通过 expr 计算得出目的表中对应列的值。衍生列通常排列在映射列之后,虽然这不是强制的规定,但是 Doris 总是先解析映射列,再解析衍生列。
- 接上一个示例,假设目的表还有第4列 v2,v2 由 k1 和 k2 的和产生。则可以书写如下:
- COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);
-
- 3. where_predicates
- 用于指定过滤条件,以过滤掉不需要的列。过滤列可以是映射列或衍生列。
- 例如我们只希望导入 k1 大于 100 并且 k2 等于 1000 的列,则书写如下:
- WHERE k1 > 100 and k2 = 1000
- 4. partitions
- 指定导入目的表的哪些 partition 中。如果不指定,则会自动导入到对应的 partition 中。
- 示例:
- PARTITION(p1, p2, p3)
- 5. delete_on_predicates
- 表示删除条件,仅在 merge type 为MERGE 时有意义,语法与where 相同
- 6. source_sequence:
- 只适用于UNIQUE_KEYS,相同key列下,保证value列按照source_sequence列进行REPLACE, source_sequence可以是数据源中的列,也可以是表结构中的一列。
- 5. job_properties
- 用于指定例行导入作业的通用参数。
- 语法:
- PROPERTIES (
- "key1" = "val1",
- "key2" = "val2"
- )

- 1.desired_concurrent_number
- 期望的并发度。一个例行导入作业会被分成多个子任务执行。这个参数指定一个作业最多有多少任务可以同时执行。必须大于0。默认为3。这个并发度并不是实际的并发度,实际的并发度,会通过集群的节点数、负载情况,以及数据源的情况综合考虑。 例:"desired_concurrent_number" = "3"
- 2.max_batch_interval/max_batch_rows/max_batch_size
- 这三个参数分别表示:
- 1)每个子任务最大执行时间,单位是秒。范围为 5 到 60。默认为10。
- 2)每个子任务最多读取的行数。必须大于等于200000。默认是200000。
- 3)每个子任务最多读取的字节数。单位是字节,范围是 100MB 到 1GB。默认是 100MB
- 这三个参数,用于控制一个子任务的执行时间和处理量。当任意一个达到阈值,则任务结束。
- 例: "max_batch_interval" = "20",
- "max_batch_rows" = "300000",
- "max_batch_size" = "209715200"
- 3.max_error_number
- 采样窗口内,允许的最大错误行数。必须大于等于0。默认是 0,即不允许有错误行。采样窗口为 max_batch_rows * 10。即如果在采样窗口内,错误行数大于 max_error_number,则会导致例行作业被暂停,需要人工介入检查数据质量问题。 被 where 条件过滤掉的行不算错误行。
- 4.strict_mode
- 是否开启严格模式,默认为关闭。如果开启后,非空原始数据的列类型变换如果结果为 NULL,则会被过滤。指定方式为 "strict_mode" = "true"
- 5.timezone
- 指定导入作业所使用的时区。默认为使用 Session 的 timezone 参数。该参数会影响所有导入涉及的和时区有关的函数结果。
- 6.format
- 指定导入数据格式,默认是csv,支持json格式。"format" = "json"
- 7.jsonpaths
- 导入json方式分为:简单模式和匹配模式。如果设置了jsonpath则为匹配模式导入,否则为简单模式导入
- "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]"
- 8.json_root
- json_root为合法的jsonpath字符串,用于指定json document的根节点,默认值为"",
- {
- "data": [{
- "category": "11",
- "title": "SayingsoftheCentury",
- "price": 895,
- "timestamp": 1589191587
- },
- {
- "category": "22",
- "author": "2avc",
- "price": 895,
- "timestamp": 1589191487
- },
- {
- "category": "33",
- "author": "3avc",
- "title": "SayingsoftheCentury",
- "timestamp": 1589191387
- }
- ]
- }
- "json_root" = "$.data"
- 9.strip_outer_array
- 布尔类型,为true表示json数据以数组对象开始且将数组对象中进行展平,默认值是false。
- [{
- "category": "11",
- "title": "SayingsoftheCentury",
- "price": 895,
- "timestamp": 1589191587
- },
- {
- "category": "22",
- "author": "2avc",
- "price": 895,
- "timestamp": 1589191487
- },
- {
- "category": "33",
- "author": "3avc",
- "title": "SayingsoftheCentury",
- "timestamp": 1589191387
- }
- ]
- "strip_outer_array" = "true"
- 10.data_source
- 数据源的类型 FROM KAFKA
- 11.data_source_properties
- 指定数据源相关的信息。
- 语法:(
- "key1" = "val1",
- "key2" = "val2"
- )
- 例:
- (
- "kafka_broker_list" = "test-dev-bigdata5:9092,test-dev-bigdata6:9092,test-dev-bigdata7:9092",
- "kafka_topic" = "test_doris_kafka_load",
- "kafka_partitions" = "0",
- "kafka_offsets" = "0,OFFSET_BEGINNING,OFFSET_END"
- );
- OFFSET_BEGINNING: 从有数据的位置开始订阅
- OFFSET_END: 从末尾开始订阅
- 12.导入数据格式样例
- 整型类(TINYINT/SMALLINT/INT/BIGINT/LARGEINT):1, 1000, 1234
- 浮点类(FLOAT/DOUBLE/DECIMAL):1.1, 0.23, .356
- 日期类(DATE/DATETIME):2017-10-03, 2017-06-13 12:34:03。
- 字符串类(CHAR/VARCHAR)(无引号):I am a student, a
- NULL值:\N

- ####示例1
- 1.创建表
- CREATE TABLE `example_table` (
- `id` int,
- `name` varchar(11),
- `age` int,
- `address` varchar(50)
- )
- DISTRIBUTED BY HASH(id) BUCKETS 2
- PROPERTIES(
- "replication_num" = "2"
- );
- 2.创建ROUTINE
- CREATE ROUTINE LOAD example_db.test_json_label_1 ON example_table
- COLUMNS(id,age,name,address)
- PROPERTIES
- (
- "desired_concurrent_number"="2",
- "max_batch_interval" = "20",
- "max_batch_rows" = "300000",
- "max_batch_size" = "209715200",
- "strict_mode" = "false",
- "format" = "json"
- )
- FROM KAFKA
- (
- "kafka_broker_list" = "test-dev-bigdata5:9092,test-dev-bigdata6:9092,test-dev-bigdata7:9092",
- "kafka_topic" = "test_doris_kafka_load",
- "kafka_partitions" = "0",
- "kafka_offsets" = "0"
- );
- 说明:
- example_db 数据库
- example_table 表名称
- test_json_label_1 唯一任务标识
- 格式:
- {
- "id": 1,
- "age": 18,
- "name": "曹丽娜",
- "address": "china"
- }
- ####示例2
- 1.创建表
- CREATE TABLE `example_tbl` (
- `category` varchar(24) NULL COMMENT "",
- `author` varchar(24) NULL COMMENT "",
- `timestamp` bigint(20) NULL COMMENT "",
- `dt` int(11) NULL COMMENT "",
- `price` double REPLACE
- ) ENGINE=OLAP
- AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
- COMMENT "OLAP"
- PARTITION BY RANGE(`dt`)
- (PARTITION p0 VALUES [("-2147483648"), ("20200509")),
- PARTITION p20200509 VALUES [("20200509"), ("20200510")),
- PARTITION p20200510 VALUES [("20200510"), ("20200511")),
- PARTITION p20200511 VALUES [("20200511"), ("20200512")))
- DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4
- PROPERTIES (
- "storage_type" = "COLUMN",
- "replication_num" = "1"
- );
- 2.创建ROUTINE
- CREATE ROUTINE LOAD example_db.test1 ON example_tbl
- COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
- PROPERTIES
- (
- "desired_concurrent_number"="2",
- "max_batch_interval" = "20",
- "max_batch_rows" = "300000",
- "max_batch_size" = "209715200",
- "strict_mode" = "false",
- "format" = "json",
- "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
- "strip_outer_array" = "true"
- )
- FROM KAFKA
- (
- "kafka_broker_list" = "test-dev-bigdata5:9092,test-dev-bigdata6:9092,test-dev-bigdata7:9092",
- "kafka_topic" = "test_doris_kafka_load",
- "property.group.id" = "test1",
- "property.client.id" = "test1",
- "kafka_partitions" = "0",
- "kafka_offsets" = "0"
- );
- 3.格式
- [{
- "category": "11",
- "title": "SayingsoftheCentury",
- "price": 895,
- "timestamp": 1589191587
- },
- {
- "category": "22",
- "author": "2avc",
- "price": 895,
- "timestamp": 1589191487
- },
- {
- "category": "33",
- "author": "3avc",
- "title": "SayingsoftheCentury",
- "timestamp": 1589191387
- }
- ]
- ####示例3
- CREATE ROUTINE LOAD example_db.test3 ON example_tbl
- COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
- PROPERTIES
- (
- "desired_concurrent_number"="2",
- "max_batch_interval" = "20",
- "max_batch_rows" = "300000",
- "max_batch_size" = "209715200",
- "strict_mode" = "false",
- "format" = "json",
- "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
- "strip_outer_array" = "true"
- "json_root" = "$.data"
- )
- FROM KAFKA
- (
- "kafka_broker_list" = "test-dev-bigdata5:9092,test-dev-bigdata6:9092,test-dev-bigdata7:9092",
- "kafka_topic" = "test_doris_kafka_load",
- "kafka_partitions" = "0",
- "kafka_offsets" = "8"
- );
- 格式:
- {
- "data": [{
- "category": "11",
- "title": "SayingsoftheCentury",
- "price": 895,
- "timestamp": 1589191587
- },
- {
- "category": "22",
- "author": "2avc",
- "price": 895,
- "timestamp": 1589191487
- },
- {
- "category": "33",
- "author": "3avc",
- "title": "SayingsoftheCentury",
- "timestamp": 1589191387
- }
- ]
- }
- ####示例4
- 为 example_db 的 example_tbl 创建一个名为 test1 的 Kafka 例行导入任务。并且删除与v3 >100 行相匹配的key列的行
- CREATE ROUTINE LOAD example_db.test1 ON example_tbl
- WITH MERGE
- COLUMNS(k1, k2, k3, v1, v2, v3),
- WHERE k1 > 100 and k2 like "%doris%",
- DELETE ON v3 >100
- PROPERTIES
- (
- "desired_concurrent_number"="3",
- "max_batch_interval" = "20",
- "max_batch_rows" = "300000",
- "max_batch_size" = "209715200",
- "strict_mode" = "false"
- )
- FROM KAFKA
- (
- "kafka_broker_list" = "test-dev-bigdata5:9092,test-dev-bigdata6:9092,test-dev-bigdata7:9092",
- "kafka_topic" = "test_doris_kafka_load",
- "kafka_partitions" = "0",
- "kafka_offsets" = "8"
- );
- ####示例5
- 导入数据到含有sequence列的UNIQUE_KEYS表中
- CREATE ROUTINE LOAD example_db.test_job ON example_tbl
- COLUMNS TERMINATED BY ",",
- COLUMNS(k1,k2,source_sequence,v1,v2),
- ORDER BY source_sequence
- PROPERTIES
- (
- "desired_concurrent_number"="3",
- "max_batch_interval" = "30",
- "max_batch_rows" = "300000",
- "max_batch_size" = "209715200"
- ) FROM KAFKA
- (
- "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
- "kafka_topic" = "my_topic",
- "kafka_partitions" = "0,1,2,3",
- "kafka_offsets" = "101,0,0,200"
- );

- 查看test1的运行状态
- SHOW ROUTINE LOAD TASK WHERE JobName = "test1";
- 停止test1的运行状态
- STOP ROUTINE LOAD FOR test1;
- 暂停test1的运行状态
- PAUSE ROUTINE LOAD FOR test1;
- 恢复名称为 test1 的例行导入作业。
- RESUME ROUTINE LOAD FOR test1;
system->routine_loads
- 1.一直提交,但是没有数据进来,不会报错
- 原因:连接kafka集群的时候采用ip连接就会出现上面那种情况,建议换成主机名
- 2.出现消费kafka任务终止的情况,是因为下面那三个参数没有设置合理
- max_batch_interval/max_batch_rows/max_batch_size
- 这三个参数用于控制单个任务的执行时间。其中任意一个阈值达到,则任务结束。其中 max_batch_rows 用于记录从 Kafka 中读取到的数据行数。max_batch_size 用于记录从 Kafka 中读取到的数据量,单位是字节。目前一个任务的消费速率大约为 5-10MB/s。
- 那么假设一行数据 500B,用户希望每 100MB 或 10 秒为一个 task。100MB 的预期处理时间是 10-20 秒,对应的行数约为 200000 行。则一个合理的配置为:
- "max_batch_interval" = "10",
- "max_batch_rows" = "200000",
- "max_batch_size" = "104857600"
- 这三个参数需要合理设置
- 用户在提交导入任务后,FE 会生成对应的 Plan 并根据目前 BE 的个数和文件的大小,将 Plan 分给 多个 BE 执行,每个 BE 执行一部分导入数据。
- BE 在执行的过程中会从 Broker 拉取数据,在对数据 transform 之后将数据导入系统。所有 BE 均完成导入,由 FE 最终决定导入是否成功。
- 添加
- ALTER SYSTEM ADD BROKER broker_name_1 "test-dev-bigdata5:8000";
- ALTER SYSTEM ADD BROKER broker_name_2 "test-dev-bigdata6:8000";
- ALTER SYSTEM ADD BROKER broker_name_3 "test-dev-bigdata7:8000";
-
- 删除
- ALTER SYSTEM DROP BROKER broker_name01 "test-pro-doris-01:8000";
- ####多个表导入
- 1.创建表
- CREATE TABLE `test1` (
- `id` int,
- `name` varchar(11)
- )
- DISTRIBUTED BY HASH(id) BUCKETS 2
- PROPERTIES(
- "replication_num" = "2"
- );
-
- CREATE TABLE `test2` (
- `col1` int,
- `col2` varchar(11)
- )
- DISTRIBUTED BY HASH(id) BUCKETS 2
- PROPERTIES(
- "replication_num" = "2"
- );
- 2.创建LABEL
- LOAD LABEL example_db.label1
- (
- DATA INFILE("hdfs://10.220.147.151:8020/tmp/palo/file")
- INTO TABLE test1
- COLUMNS TERMINATED BY ","
- (id,name)
- ,
- DATA INFILE("hdfs://10.220.147.151:8020/tmp/palo/file1")
- INTO TABLE test2
- COLUMNS TERMINATED BY ","
- (col1, col2)
- )
- WITH BROKER 'broker_name_2'
- PROPERTIES
- (
- "timeout" = "3600"
- );

- 1.FE 调度提交 ETL 任务到 Spark 集群执行。
- 2.Spark 集群执行 ETL 完成对导入数据的预处理。包括全局字典构建(BITMAP类型)、分区、排序、聚合等。
- 3.ETL 任务完成后,FE 获取预处理过的每个分片的数据路径,并调度相关的 BE 执行 Push 任务。
- 4.BE 通过 Broker 读取数据,转化为 Doris 底层存储格式。
- 5.FE 调度生效版本,完成导入任务。
- 1下载spark依赖包
- 2.tar -zxvf spark-1.5.1-bin-hadoop2.4.tgz 解压即可
- 3.将spark客户端下的jars文件夹内所有jar包归档打包成一个zip文件
- 4.安装yarn客户端
- 5.vim ../fe.conf添加
- enable_spark_load = true
- spark_home_default_dir = /usr/local/spark2
- spark_resource_path = /usr/local/spark2/spark-2x.zip
- yarn_client_path = /usr/local/hadoop/bin/yarn
- ####语法:
- -- create spark resource 创建
- CREATE EXTERNAL RESOURCE resource_name
- PROPERTIES
- (
- type = spark,
- spark_conf_key = spark_conf_value,
- working_dir = path,
- broker = broker_name,
- broker.property_key = property_value
- )
-
- -- drop spark resource 删除
- DROP RESOURCE resource_name
-
- -- show resources 查看
- SHOW RESOURCES
- SHOW PROC "/resources"
-
- -- privileges 赋权限
- GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identity
- GRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name
-
- -- 例子:授予spark0资源的使用权限给用户user0
- GRANT USAGE_PRIV ON RESOURCE "spark0" TO "user0"@"%";
-
- ####参数说明
- resource_name 为 Doris 中配置的 Spark 资源的名字
- Spark 相关参数如下:
- spark.master: 必填,目前支持yarn,spark://host:port。
- spark.submit.deployMode: Spark 程序的部署模式,必填,支持 cluster,client 两种。
- spark.hadoop.yarn.resourcemanager.address: master为yarn时必填。
- spark.hadoop.fs.defaultFS: master为yarn时必填。
- 其他参数为可选,参考http://spark.apache.org/docs/latest/configuration.html
- working_dir: ETL 使用的目录。spark作为ETL资源使用时必填。例如:hdfs://host:port/tmp/doris。
- broker: broker 名字。spark作为ETL资源使用时必填。需要使用ALTER SYSTEM ADD BROKER 命令提前完成配置。
- broker.property_key: broker读取ETL生成的中间文件时需要指定的认证信息等。
-
- ####示例配置 ETL 集群cluster模式
- CREATE EXTERNAL RESOURCE "spark6"
- PROPERTIES
- (
- "type" = "spark",
- "spark.master" = "yarn",
- "spark.submit.deployMode" = "cluster",
- "spark.executor.memory" = "1g",
- "spark.yarn.queue" = "queue0",
- "spark.hadoop.yarn.resourcemanager.address" = "hdfs://test-dev-bigdata1:8032",
- "spark.hadoop.fs.defaultFS" = "hdfs://test-dev-bigdata1:8020",
- "working_dir" = "hdfs://test-dev-bigdata1:8020/tmp/doris",
- "broker" = "broker_name_1"
- );
-
- ####client模式
- CREATE EXTERNAL RESOURCE "spark2"
- PROPERTIES
- (
- "type" = "spark",
- "spark.master" = "spark://10.220.147.155:7077",
- "spark.submit.deployMode" = "client",
- "working_dir" = "hdfs://10.220.147.151:8020/tmp/doris",
- "broker" = "broker_name_2"
- );
-

- ####上游数据源是hive表的情况
- ####step 1:新建hive外部表
- CREATE EXTERNAL TABLE hive_t2
- (
- orderid INT,
- createtime varchar(25),
- modifiedtime varchar(50),
- status varchar(100),
- dt varchar(100)
- )
- ENGINE=hive
- properties
- (
- "database" = "test",
- "table" = "ods_orders",
- "hive.metastore.uris" = "thrift://10.220.147.151:9083"
- );
- ####step 2:创建doris表
- CREATE TABLE `test_hive` (
- orderid INT,
- createtime varchar(25),
- modifiedtime varchar(50),
- status varchar(100),
- dt varchar(100)
- )
- DISTRIBUTED BY HASH(orderid) BUCKETS 2
- PROPERTIES(
- "replication_num" = "2"
- );
- #####step 3:提交load命令,要求导入的 doris 表中的列必须在 hive 外部表中存在。
- LOAD LABEL demo.label25
- (
- DATA FROM TABLE hive_t2
- INTO TABLE test_hive
- )
- WITH RESOURCE 'spark0'
- (
- "spark.executor.memory" = "2g",
- "spark.shuffle.compress" = "true"
- )
- PROPERTIES
- (
- "timeout" = "3600" //注:数据量特别大的,时间可以设置长
- );
-
- ####上游数据源为hdfs文件的情况
- ####step 1:创建表
- CREATE TABLE `test3` (
- `id` int,
- `name` varchar(11)
- )
- DISTRIBUTED BY HASH(id) BUCKETS 2
- PROPERTIES(
- "replication_num" = "2"
- );
- ####step 2:提交load命令
- LOAD LABEL demo.label23
- (
- DATA INFILE("hdfs://10.220.147.151:8020/tmp/palo/file1")
- INTO TABLE test3
- COLUMNS TERMINATED BY ","
- (id,name)
-
- )
- WITH RESOURCE 'spark0'
- (
- "spark.executor.memory" = "2g",
- "spark.shuffle.compress" = "true"
- )
- PROPERTIES
- (
- "timeout" = "3600"
- );

- 1.type:ETL_SUBMIT_FAIL; msg:errCode = 2, detailMessage = start spark app failed. error: Waiting too much time to get appId from handle. spark app state: UNKNOWN, loadJobId:16020
- 出现上面这个情况需要去看具体的错误,目录:/soft/doris-fe/log/spark_launcher_log
- 2.1064 - errCode = 2, detailMessage = Spark Load is coming soon
- 打压缩包的时候只需要把jar文件下的包打进去,不需要把jar目录打进去
- 3.type:ETL_SUBMIT_FAIL; msg:errCode = 2, detailMessage = errCode = 2, detailMessage = failed to upload lib to repository, srcPath=/usr/local/spark2/spark-2x.zip destPath=hdfs://10.220.147.151:8020/tmp/doris/2113522669/__spark_repository__spark0/__archive_1.0.0/__lib__spark-2x.zip message=errCode = 2, detailMessage = Read file exception. filePath=/usr/local/spark2/spark-2x.zip
- 原因:配置 fe.conf spark_resource_path = /usr/local/spark2/spark-2x.zip 这个地址写错了
- 4. 查看yarn上面的报错信息:Path does not exist: hdfs://10.220.147.151:8020/tmp/doris/jobs/11001/label17/18009/configs/jobconfig.json;
- fe.log的错误信息是:$JAVA_HOEM not set
- 原因java_home没有设置,查看JAVA_HOME是否配置,如果配置,则需要在hadoop/libexec/hadoop-config.sh添加export JAVA_HOEM=/usr/local/java,因为在fe要用hadoop/libexec/hadoop-config.sh
- 添加分区
- ALTER TABLE ods.ods_pos_pro_dish_list_detail_delta ADD PARTITION P_20150101 VALUES [("2015-01-01 00:00:00"),("2016-01-01 00:00:00"));
- 删除分区
- ALTER TABLE ods_pos_pro_dish_list_detail_delta DROP PARTITION P_20200101;
- 设置动态分区
- ALTER TABLE ods.ods_pos_pro_dish_list_detail_delta SET
- (
- "dynamic_partition.enable" = "true"
- );
- 修改副本数
- ALTER TABLE ods.ods_pos_pro_dish_list_detail_delta SET
- (
- "replication_num" = "6"
- );
- 1) 设置数据库数据量配额,单位为B/K/KB/M/MB/G/GB/T/TB/P/PB
- ALTER DATABASE db_name SET DATA QUOTA quota;
- 2) 重命名数据库
- ALTER DATABASE db_name RENAME new_db_name;
- 3) 设置数据库的副本数量配额
- ALTER DATABASE db_name SET REPLICA QUOTA quota;
- 添加字段
- ALTER TABLE example_db.my_table ADD COLUMN dish_type_code varchar(20) DEFAULT NULL
- 添加字段时出错:errCode = 2, detailMessage = Create replicas failed. Error: Error replicas:10003=5341268, 10003=5341296, 10003=5341288
- 解决: ADMIN SET FRONTEND CONFIG ("tablet_create_timeout_second"="10");
- ADMIN SET FRONTEND CONFIG ("max_create_table_timeout_second"="1000"); 修改字段注释
- ALTER TABLE tb_user MODIFY COLUMN name VARCHAR(30) NOT NULL COMMENT '姓名2';

- SELECT * FROM ods_pos_pro_taste_name_delta
- INTO OUTFILE "hdfs://test-pro-cdh-namenode1:8020/tmp/palo/result_"
- FORMAT AS CSV
- PROPERTIES
- (
- "broker.name" = "broker_name01",
- "column_separator" = ",",
- "line_delimiter" = "\n",
- "max_file_size" = "10MB"
- );
-
- ###参数介绍
- column_separator:列分隔符,仅对 CSV 格式适用。默认为 \t。
- line_delimiter:行分隔符,仅对 CSV 格式适用。默认为 \n。
- max_file_size:单个文件的最大大小。默认为 1GB。取值范围在 5MB 到 2GB 之间。超过这个大小的文件将会被切分。
- ####创建语法(help CREATE REPOSITORY;)
- 该语句用于创建仓库。仓库用于属于备份或恢复。仅 root 或 superuser 用户可以创建仓库。
- 语法:
- CREATE [READ ONLY] REPOSITORY `repo_name`
- WITH BROKER `broker_name`
- ON LOCATION `repo_location`
- PROPERTIES ("key"="value", ...);
-
- 说明:
- 1. 仓库的创建,依赖于已存在的 broker
- 2. 如果是只读仓库,则只能在仓库上进行恢复。如果不是,则可以进行备份和恢复操作。
- 3. 根据 broker 的不同类型,PROPERTIES 有所不同,具体见示例。
- Examples:
- 1. 创建名为 bos_repo 的仓库,依赖 BOS broker "bos_broker",数据根目录为:bos://palo_backup
- CREATE REPOSITORY `bos_repo`
- WITH BROKER `bos_broker`
- ON LOCATION "bos://palo_backup"
- PROPERTIES
- (
- "bos_endpoint" = "http://gz.bcebos.com",
- "bos_accesskey" = "069fc2786e664e63a5f111111114ddbs22",
- "bos_secret_accesskey"="70999999999999de274d59eaa980a"
- );
-
- 2. 创建和示例 1 相同的仓库,但属性为只读:
- CREATE READ ONLY REPOSITORY `bos_repo`
- WITH BROKER `bos_broker`
- ON LOCATION "bos://palo_backup"
- PROPERTIES
- (
- "bos_endpoint" = "http://gz.bcebos.com",
- "bos_accesskey" = "069fc2786e664e63a5f111111114ddbs22",
- "bos_secret_accesskey"="70999999999999de274d59eaa980a"
- );
-
- 3. 创建名为 hdfs_repo 的仓库,依赖 Baidu hdfs broker "hdfs_broker",数据根目录为:hdfs://hadoop-name-node:54310/path/to/repo/
- CREATE REPOSITORY `hdfs_repo`
- WITH BROKER `hdfs_broker`
- ON LOCATION "hdfs://hadoop-name-node:54310/path/to/repo/"
- PROPERTIES
- (
- "username" = "user",
- "password" = "password"
- );
- ##查看远程仓库命令
- show REPOSITORY;
-
- ##备份(help BACKUP)
- 该语句用于备份指定数据库下的数据。该命令为异步操作。提交成功后,需通过 SHOW BACKUP 命令查看进度。仅支持备份 OLAP 类型的表。
- 语法:
- BACKUP SNAPSHOT [db_name].{snapshot_name}
- TO `repository_name`
- ON (
- `table_name` [PARTITION (`p1`, ...)],
- ...
- )
- PROPERTIES ("key"="value", ...);
-
- 说明:
- 1. 同一数据库下只能有一个正在执行的 BACKUP 或 RESTORE 任务。
- 2. ON 子句中标识需要备份的表和分区。如果不指定分区,则默认备份该表的所有分区。
- 3. PROPERTIES 目前支持以下属性:
- "type" = "full":表示这是一次全量更新(默认)。
- "timeout" = "3600":任务超时时间,默认为一天。单位秒。
- Examples:
-
- 1. 全量备份 example_db 下的表 example_tbl 到仓库 example_repo 中:
- BACKUP SNAPSHOT example_db.snapshot_label1
- TO example_repo
- ON (example_tbl)
- PROPERTIES ("type" = "full");
-
- 2. 全量备份 example_db 下,表 example_tbl 的 p1, p2 分区,以及表 example_tbl2 到仓库 example_repo 中:
- BACKUP SNAPSHOT example_db.snapshot_label2
- TO example_repo
- ON
- (
- example_tbl PARTITION (p1,p2),
- example_tbl2
- );
- ###查看最近一次 backup 作业的执行情况,包括
- JobId:本次备份作业的 id。
- SnapshotName:用户指定的本次备份作业的名称(Label)。
- DbName:备份作业对应的 Database。
- State:备份作业当前所在阶段:
- PENDING:作业初始状态。
- SNAPSHOTING:正在进行快照操作。
- UPLOAD_SNAPSHOT:快照结束,准备上传。
- UPLOADING:正在上传快照。
- SAVE_META:正在本地生成元数据文件。
- UPLOAD_INFO:上传元数据文件和本次备份作业的信息。
- FINISHED:备份完成。
- CANCELLED:备份失败或被取消。
- BackupObjs:本次备份涉及的表和分区的清单。
- CreateTime:作业创建时间。
- SnapshotFinishedTime:快照完成时间。
- UploadFinishedTime:快照上传完成时间。
- FinishedTime:本次作业完成时间。
- UnfinishedTasks:在 SNAPSHOTTING,UPLOADING 等阶段,会有多个子任务在同时进行,这里展示的当前阶段,未完成的子任务的 task id。
- TaskErrMsg:如果有子任务执行出错,这里会显示对应子任务的错误信息。
- Status:用于记录在整个作业过程中,可能出现的一些状态信息。
- Timeout:作业的超时时间,单位是秒。
- ###命令:SHOW BACKUP
- ###查看远端仓库中已存在的备份。
- 1. 查看仓库 example_repo 中已有的备份:
- SHOW SNAPSHOT ON example_repo;
-
- 2. 仅查看仓库 example_repo 中名称为 backup1 的备份:
- SHOW SNAPSHOT ON example_repo WHERE SNAPSHOT = "backup1";
-
- 3. 查看仓库 example_repo 中名称为 backup1 的备份,时间版本为 "2018-05-05-15-34-26" 的详细信息:
- SHOW SNAPSHOT ON example_repo
- WHERE SNAPSHOT = "backup1" AND TIMESTAMP = "2018-05-05-15-34-26";
-
- ###参数说明:
- Snapshot:备份时指定的该备份的名称(Label)。
- Timestamp:备份的时间戳。
- Status:该备份是否正常。
-
- ###恢复(help RESTORE)
- 1. RESTORE
- 该语句用于将之前通过 BACKUP 命令备份的数据,恢复到指定数据库下。该命令为异步操作。提交成功后,需通过 SHOW RESTORE 命令查看进度。仅支持恢复 OLAP 类型的表。
- 语法:
- RESTORE SNAPSHOT [db_name].{snapshot_name}
- FROM `repository_name`
- ON (
- `table_name` [PARTITION (`p1`, ...)] [AS `tbl_alias`],
- ...
- )
- PROPERTIES ("key"="value", ...);
-
- 说明:
- 1. 同一数据库下只能有一个正在执行的 BACKUP 或 RESTORE 任务。
- 2. ON 子句中标识需要恢复的表和分区。如果不指定分区,则默认恢复该表的所有分区。所指定的表和分区必须已存在于仓库备份中。
- 3. 可以通过 AS 语句将仓库中备份的表名恢复为新的表。但新表名不能已存在于数据库中。分区名称不能修改。
- 4. 可以将仓库中备份的表恢复替换数据库中已有的同名表,但须保证两张表的表结构完全一致。表结构包括:表名、列、分区、Rollup等等。
- 5. 可以指定恢复表的部分分区,系统会检查分区 Range 是否能够匹配。
- 6. PROPERTIES 目前支持以下属性:
- "backup_timestamp" = "2018-05-04-16-45-08":指定了恢复对应备份的哪个时间版本,必填。该信息可以通过 `SHOW SNAPSHOT ON repo;` 语句获得。
- "replication_num" = "3":指定恢复的表或分区的副本数。默认为3。若恢复已存在的表或分区,则副本数必须和已存在表或分区的副本数相同。同时,必须有足够的 host 容纳多个副本。
- "timeout" = "3600":任务超时时间,默认为一天。单位秒。
- "meta_version" = 40:使用指定的 meta_version 来读取之前备份的元数据。注意,该参数作为临时方案,仅用于恢复老版本 Doris 备份的数据。最新版本的备份数据中已经包含 meta version,无需再指定。
- Examples:
- 1. 从 example_repo 中恢复备份 snapshot_1 中的表 backup_tbl 到数据库 example_db1,时间版本为 "2018-05-04-16-45-08"。恢复为 1 个副本:
- RESTORE SNAPSHOT example_db1.`snapshot_1`
- FROM `example_repo`
- ON ( `backup_tbl` )
- PROPERTIES
- (
- "backup_timestamp"="2018-05-04-16-45-08",
- "replication_num" = "1"
- );
-
- 2. 从 example_repo 中恢复备份 snapshot_2 中的表 backup_tbl 的分区 p1,p2,以及表 backup_tbl2 到数据库 example_db1,并重命名为 new_tbl,时间版本为 "2018-05-04-17-11-01"。默认恢复为 3 个副本:
- RESTORE SNAPSHOT example_db1.`snapshot_2`
- FROM `example_repo`
- ON
- (
- `backup_tbl` PARTITION (`p1`, `p2`),
- `backup_tbl2` AS `new_tbl`
- )
- PROPERTIES
- (
- "backup_timestamp"="2018-05-04-17-11-01"
- );
- ###查看最近一次 restore 作业的执行情况,包括:
- JobId:本次恢复作业的 id。
- Label:用户指定的仓库中备份的名称(Label)。
- Timestamp:用户指定的仓库中备份的时间戳。
- DbName:恢复作业对应的 Database。
- State:恢复作业当前所在阶段:
- PENDING:作业初始状态。
- SNAPSHOTING:正在进行本地新建表的快照操作。
- DOWNLOAD:正在发送下载快照任务。
- DOWNLOADING:快照正在下载。
- COMMIT:准备生效已下载的快照。
- COMMITTING:正在生效已下载的快照。
- FINISHED:恢复完成。
- CANCELLED:恢复失败或被取消。
- AllowLoad:恢复期间是否允许导入。
- ReplicationNum:恢复指定的副本数。
- RestoreObjs:本次恢复涉及的表和分区的清单。
- CreateTime:作业创建时间。
- MetaPreparedTime:本地元数据生成完成时间。
- SnapshotFinishedTime:本地快照完成时间。
- DownloadFinishedTime:远端快照下载完成时间。
- FinishedTime:本次作业完成时间。
- UnfinishedTasks:在 SNAPSHOTTING,DOWNLOADING, COMMITTING 等阶段,会有多个子任务在同时进行,这里展示的当前阶段,未完成的子任务的 task id。
- TaskErrMsg:如果有子任务执行出错,这里会显示对应子任务的错误信息。
- Status:用于记录在整个作业过程中,可能出现的一些状态信息。
- Timeout:作业的超时时间,单位是秒。
- ###命令:SHOW RESTORE
-
- ##取消当前正在执行的备份作业。
- CANCEL BACKUP
-
- ##取消当前正在执行的恢复作业。
- CANCEL RESTORE
-
- ##删除已创建的远端仓库。删除仓库,仅仅是删除该仓库在 Doris 中的映射,不会删除实际的仓库数据。
- DROP REPOSITORY
-
- ###示例
- ## 1.创建远程仓库
- CREATE REPOSITORY `hdfs_repo`
- WITH BROKER `broker_name_2`
- ON LOCATION "hdfs://test-dev-bigdata1:8020/tmp/doris_backup"
- PROPERTIES
- (
- "username" = "",
- "password" = ""
- );
-
- ##2.备份表
- 全量备份 example_db 下的表 example_tbl 到仓库 example_repo 中:
- BACKUP SNAPSHOT ods.snapshot_label1
- TO hdfs_repo
- ON (ods_pos_pro_sell_out_delta)
- PROPERTIES ("type" = "full");
-
- ##3.查看备份任务
- show BACKUP
- ##4.查看SNAPSHOT状态
- SHOW SNAPSHOT ON hdfs_repo WHERE SNAPSHOT = "snapshot_label1";
- ##5.其他集群数据下添加REPOSITORY
- CREATE REPOSITORY `hdfs_repo`
- WITH BROKER `broker_name01`
- ON LOCATION "hdfs://test-dev-bigdata1:8020/tmp/doris_backup"
- PROPERTIES
- (
- "username" = "",
- "password" = ""
- );
- ##6.恢复
- RESTORE SNAPSHOT demo.`snapshot_label1`
- FROM `hdfs_repo`
- ON ( `ods_pos_pro_sell_out_delta` )
- PROPERTIES
- (
- "backup_timestamp"="2020-12-21-13-47-49", //通过SHOW SNAPSHOT ON hdfs_repo WHERE SNAPSHOT = "snapshot_label1";获取
- "replication_num" = "3"
- );
- ## 7.查看恢复状态
- SHOW RESTORE
-
- ##示例2
- ##备份分区表
- BACKUP SNAPSHOT demo.snapshot_label2
- TO hdfs_repo
- ON (
- ods_pos_pro_dish_list_detail_delta_tmp PARTITION (`P_20201101`, `P_20201102`,`P_20201103`,`P_20201104`,`P_20201105`,`P_20201106`,`P_20201107`,`P_20201108`,`P_20201109`,`P_20201110`,`P_20201111`,`P_20201112`)
- )
- PROPERTIES (
- "type" = "full"
- );
- ##迁移
- RESTORE SNAPSHOT ods.`snapshot_label2`
- FROM `hdfs_repo`
- ON ( ods_pos_pro_dish_list_detail_delta_tmp PARTITION (`P_20201101`, `P_20201102`,`P_20201103`,`P_20201104`,`P_20201105`,`P_20201106`,`P_20201107`,`P_20201108`,`P_20201109`,`P_20201110`,`P_20201111`,`P_20201112`)
- )
- PROPERTIES
- (
- "backup_timestamp"="2020-12-21-14-37-45",
- "replication_num" = "3"
- );

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。