赞
踩
#创建test用户
CREATE USER 'test' IDENTIFIED by 'test';
#创建数据库
CREATE DATABASE test_db;
#用户授权
grant all on test_db to test;
在 Doris 中,数据都以关系表(Table)的形式进行逻辑上的描述。
一张表包括行(Row)和列(Column)。Row 即用户的一行数据。Column 用于描述一
行数据中不同的字段。
⚫ 在默认的数据模型中,Column 只分为排序列和非排序列。存储引擎会按照排序列对数据进行排序存储,并建立稀疏索引,以便在排序数据上进行快速查找。
⚫ 而在聚合模型中,Column 可以分为两大类:Key 和 Value。从业务角度看,Key 和Value 可以分别对应维度列和指标列。从聚合模型的角度来说,Key 列相同的行,会聚合成一行。其中 Value 列的聚合方式由用户在建表时指定。
在 Doris 的存储引擎中,用户数据首先被划分成若干个分区(Partition),划分的规则通常是按照用户指定的分区列进行范围划分,比如按时间划分。而在每个分区内,数据被进一步的按照 Hash 的方式分桶,分桶的规则是要找用户指定的分桶列的值进行 Hash 后分桶。每个分桶就是一个数据分片(Tablet),也是数据划分的最小逻辑单元。
⚫ Tablet 之间的数据是没有交集的,独立存储的。Tablet 也是数据移动、复制等操作的最小物理存储单元。
⚫ Partition 可以视为是逻辑上最小的管理单元。数据的导入与删除,都可以或仅能针对一个 Partition 进行。
该语句用于创建 table。
语法:
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [database.]table_name
(column_definition1[, column_definition2, ...]
[, index_definition1[, ndex_definition12,]])
[ENGINE = [olap|mysql|broker|hive]]
[key_desc]
[COMMENT "table comment"];
[partition_desc]
[distribution_desc]
[rollup_index]
[PROPERTIES ("key"="value", ...)]
[BROKER PROPERTIES ("key"="value", ...)]
column_definition
语法:
col_name col_type [agg_type] [NULL | NOT NULL] [DEFAULT "default_value"]
说明:
col_name:列名称
col_type:列类型
BOOLEAN(1字节) 范围:{0,1} TINYINT(1字节) 范围:-2^7 + 1 ~ 2^7 - 1 SMALLINT(2字节) 范围:-2^15 + 1 ~ 2^15 - 1 INT(4字节) 范围:-2^31 + 1 ~ 2^31 - 1 BIGINT(8字节) 范围:-2^63 + 1 ~ 2^63 - 1 LARGEINT(16字节) 范围:-2^127 + 1 ~ 2^127 - 1 FLOAT(4字节) 支持科学计数法 DOUBLE(8字节) 支持科学计数法 DECIMAL[(precision, scale)] (16字节) 保证精度的小数类型。默认是 DECIMAL(10, 0) precision: 1 ~ 27 scale: 0 ~ 9 其中整数部分为 1 ~ 18 不支持科学计数法 DATE(3字节) 范围:0000-01-01 ~ 9999-12-31 DATETIME(8字节) 范围:0000-01-01 00:00:00 ~ 9999-12-31 23:59:59 CHAR[(length)] 定长字符串。长度范围:1 ~ 255。默认为1 VARCHAR[(length)] 变长字符串。长度范围:1 ~ 65533 HLL (1~16385个字节) hll列类型,不需要指定长度和默认值、长度根据数据的聚合 程度系统内控制,并且HLL列只能通过配套的hll_union_agg、Hll_cardinality、hll_hash进行查询或使用 BITMAP bitmap列类型,不需要指定长度和默认值。表示整型的集合,元素最大支持到2^64 - 1
agg_type:聚合类型,如果不指定,则该列为 key 列。否则,该列为 value 列
* SUM、MAX、MIN、REPLACE
* HLL_UNION(仅用于HLL列,为HLL独有的聚合方式)、
* BITMAP_UNION(仅用于 BITMAP 列,为 BITMAP 独有的聚合方式)、
* REPLACE_IF_NOT_NULL:这个聚合类型的含义是当且仅当新导入数据是非NULL值时会发生替换行为,如果新导入的数据是NULL,那么Doris仍然会保留原值。注意:如果用在建表时REPLACE_IF_NOT_NULL列指定了NOT NULL,那么Doris仍然会将其转化NULL,不会向用户报错。用户可以借助这个类型完成部分列导入的功能。
* 该类型只对聚合模型(key_desc的type为AGGREGATE KEY)有用,其它模型不需要指这个。
是否允许为NULL: 默认允许为 NULL。NULL 值在导入数据中用 \N 来表示
注意:
BITMAP_UNION聚合类型列在导入时的原始数据类型必须是TINYINT,SMALLINT,INT,BIGINT。
index_definition
语法:
INDEX index_name (col_name[, col_name, ...]) [USING BITMAP] COMMENT 'xxxxxx'
说明:
index_name:索引名称
col_name:列名
注意:
当前仅支持BITMAP索引, BITMAP索引仅支持应用于单列
ENGINE 类型
默认为 olap。可选 mysql, broker, hive
PROPERTIES (
"host" = "mysql_server_host",
"port" = "mysql_server_port",
"user" = "your_user_name",
"password" = "your_password",
"database" = "database_name",
"table" = "table_name"
)
注意: "table" 条目中的 "table_name" 是 mysql 中的真实表名。 而 CREATE TABLE 语句中的 table_name 是该 mysql 表在 Doris 中的名字,可以不同。 在 Doris 创建 mysql 表的目的是可以通过 Doris 访问 mysql 数据库。 而 Doris 本身并不维护、存储任何 mysql 数据。 2) 如果是 broker,表示表的访问需要通过指定的broker, 需要在 properties 提供以下信息: ``` PROPERTIES ( "broker_name" = "broker_name", "path" = "file_path1[,file_path2]", "column_separator" = "value_separator" "line_delimiter" = "value_delimiter" ) ``` 另外还需要提供Broker需要的Property信息,通过BROKER PROPERTIES来传递,例如HDFS需要传入 ``` BROKER PROPERTIES( "username" = "name", "password" = "password" ) ``` 这个根据不同的Broker类型,需要传入的内容也不相同 注意: "path" 中如果有多个文件,用逗号[,]分割。如果文件名中包含逗号,那么使用 %2c 来替代。如果文件名中包含 %,使用 %25 代替 现在文件内容格式支持CSV,支持GZ,BZ2,LZ4,LZO(LZOP) 压缩格式。 3) 如果是 hive,则需要在 properties 提供以下信息: ``` PROPERTIES ( "database" = "hive_db_name", "table" = "hive_table_name", "hive.metastore.uris" = "thrift://127.0.0.1:9083" ) ``` 其中 database 是 hive 表对应的库名字,table 是 hive 表的名字,hive.metastore.uris 是 hive metastore 服务地址。 注意:目前hive外部表仅用于Spark Load使用,不支持查询。
key_desc
语法:
key_type(k1[,k2 ...])
说明:
数据按照指定的key列进行排序,且根据不同的key_type具有不同特性。
key_type支持以下类型:
AGGREGATE KEY:key列相同的记录,value列按照指定的聚合类型进行聚合,
适合报表、多维分析等业务场景。
UNIQUE KEY:key列相同的记录,value列按导入顺序进行覆盖,
适合按key列进行增删改查的点查询业务。
DUPLICATE KEY:key列相同的记录,同时存在于Doris中,
适合存储明细数据或者数据无聚合特性的业务场景。
默认为DUPLICATE KEY,key列为列定义中前36个字节, 如果前36个字节的列数小于3,将使用前三列。
注意:
除AGGREGATE KEY外,其他key_type在建表时,value列不需要指定聚合类型。
partition_desc
目前支持 RANGE 和 LIST 两种分区方式。
5.1 RANGE 分区
RANGE partition描述有两种使用方式
1) LESS THAN
语法:
``` PARTITION BY RANGE (k1, k2, ...) ( PARTITION partition_name1 VALUES LESS THAN MAXVALUE|("value1", "value2", ...), PARTITION partition_name2 VALUES LESS THAN MAXVALUE|("value1", "value2", ...) ... ) ``` 说明: 使用指定的 key 列和指定的数值范围进行分区。 1) 分区名称仅支持字母开头,字母、数字和下划线组成 2) 目前仅支持以下类型的列作为 Range 分区列 TINYINT, SMALLINT, INT, BIGINT, LARGEINT, DATE, DATETIME 3) 分区为左闭右开区间,首个分区的左边界为做最小值 4) NULL 值只会存放在包含最小值的分区中。当包含最小值的分区被删除后,NULL 值将无法导入。 5) 可以指定一列或多列作为分区列。如果分区值缺省,则会默认填充最小值。 注意: 1) 分区一般用于时间维度的数据管理 2) 有数据回溯需求的,可以考虑首个分区为空分区,以便后续增加分区 2)Fixed Range 语法: ``` PARTITION BY RANGE (k1, k2, k3, ...) ( PARTITION partition_name1 VALUES [("k1-lower1", "k2-lower1", "k3-lower1",...), ("k1-upper1", "k2-upper1", "k3-upper1", ...)), PARTITION partition_name2 VALUES [("k1-lower1-2", "k2-lower1-2", ...), ("k1-upper1-2", MAXVALUE, )) "k3-upper1-2", ... ) ``` 说明: 1)Fixed Range比LESS THAN相对灵活些,左右区间完全由用户自己确定 2)其他与LESS THAN保持同步
5.2 LIST 分区
LIST partition分为单列分区和多列分区
1) 单列分区
语法:
``` PARTITION BY LIST(k1) ( PARTITION partition_name1 VALUES IN ("value1", "value2", ...), PARTITION partition_name2 VALUES IN ("value1", "value2", ...) ... ) ``` 说明: 使用指定的 key 列和制定的枚举值进行分区。 1) 分区名称仅支持字母开头,字母、数字和下划线组成 2) 目前仅支持以下类型的列作为 List 分区列 BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, LARGEINT, DATE, DATETIME, CHAR, VARCHAR 3) 分区为枚举值集合,各个分区之间分区值不能重复 4) 不可导入 NULL 值 5) 分区值不能缺省,必须指定至少一个 2) 多列分区 语法: ``` PARTITION BY LIST(k1, k2) ( PARTITION partition_name1 VALUES IN (("value1", "value2"), ("value1", "value2"), ...), PARTITION partition_name2 VALUES IN (("value1", "value2"), ("value1", "value2"), ...) ... ) ``` 说明: 1) 多列分区的分区是元组枚举值的集合 2) 每个元组值的个数必须与分区列个数相等 3) 其他与单列分区保持同步
distribution_desc
1) Hash 分桶
语法:
DISTRIBUTED BY HASH (k1[,k2 ...]) [BUCKETS num]
说明:
使用指定的 key 列进行哈希分桶。默认分区数为10
建议:建议使用Hash分桶方式
PROPERTIES
PROPERTIES (
"storage_medium" = "[SSD|HDD]",
["storage_cooldown_time" = "yyyy-MM-dd HH:mm:ss"],
["replication_num" = "3"]
["replication_allocation" = "xxx"]
)
storage_medium: 用于指定该分区的初始存储介质,可选择 SSD 或 HDD。默认初始存储介质可通过fe的配置文件 `fe.conf` 中指定 `default_storage_medium=xxx`,如果没有指定,则默认为 HDD。
注意:当FE配置项 `enable_strict_storage_medium_check` 为 `True` 时,若集群中没有设置对应的存储介质时,建表语句会报错 `Failed to find enough host in all backends with storage medium is SSD|HDD`.
storage_cooldown_time: 当设置存储介质为 SSD 时,指定该分区在 SSD 上的存储到期时间。
默认存放 30 天。
格式为:"yyyy-MM-dd HH:mm:ss"
replication_num: 指定分区的副本数。默认为 3。
replication_allocation: 按照资源标签来指定副本分布。
当表为单分区表时,这些属性为表的属性。
当表为两级分区时,这些属性为附属于每一个分区。
如果希望不同分区有不同属性。可以通过 ADD PARTITION 或 MODIFY PARTITION 进行操作
PROPERTIES (
"bloom_filter_columns"="k1,k2,k3"
)
3) 如果希望使用 Colocate Join 特性,需要在 properties 中指定
PROPERTIES (
"colocate_with"="table1"
)
4) 如果希望使用动态分区特性,需要在properties 中指定。注意:动态分区只支持 RANGE 分区
PROPERTIES (
"dynamic_partition.enable" = "true|false",
"dynamic_partition.time_unit" = "HOUR|DAY|WEEK|MONTH",
"dynamic_partition.start" = "${integer_value}",
"dynamic_partition.end" = "${integer_value}",
"dynamic_partition.prefix" = "${string_value}",
"dynamic_partition.buckets" = "${integer_value}
dynamic_partition.enable: 用于指定表级别的动态分区功能是否开启。默认为 true。 dynamic_partition.time_unit: 用于指定动态添加分区的时间单位,可选择为HOUR(小时),DAY(天),WEEK(周),MONTH(月)。 注意:以小时为单位的分区列,数据类型不能为 DATE。 dynamic_partition.start: 用于指定向前删除多少个分区。值必须小于0。默认为 Integer.MIN_VALUE。 dynamic_partition.end: 用于指定提前创建的分区数量。值必须大于0。 dynamic_partition.prefix: 用于指定创建的分区名前缀,例如分区名前缀为p,则自动创建分区名为p20200108 dynamic_partition.buckets: 用于指定自动创建的分区分桶数量 dynamic_partition.create_history_partition: 用于创建历史分区功能是否开启。默认为 false。 dynamic_partition.history_partition_num: 当开启创建历史分区功能时,用于指定创建历史分区数量。 dynamic_partition.reserved_history_periods: 用于指定保留的历史分区的时间段。 5) 建表时可以批量创建多个 Rollup 语法: ``` ROLLUP (rollup_name (column_name1, column_name2, ...) [FROM from_index_name] [PROPERTIES ("key"="value", ...)],...) ``` 6) 如果希望使用 内存表 特性,需要在 properties 中指定
PROPERTIES (
"in_memory"="true"
)
当 in_memory 属性为 true 时,Doris会尽可能将该表的数据和索引Cache到BE 内存中
7) 创建UNIQUE_KEYS表时,可以指定一个sequence列,当KEY列相同时,将按照sequence列进行REPLACE(较大值替换较小值,否则无法替换)
PROPERTIES (
"function_column.sequence_type" = 'Date',
);
sequence_type用来指定sequence列的类型,可以为整型和时间类型
Examples:
创建一个 olap 表,使用 HASH 分桶,使用列存,相同key的记录进行聚合
CREATE TABLE example_db.table_hash
(
k1 BOOLEAN,
k2 TINYINT,
k3 DECIMAL(10, 2) DEFAULT "10.5",
v1 CHAR(10) REPLACE,
v2 INT SUM
)
ENGINE=olap
AGGREGATE KEY(k1, k2, k3)
COMMENT "my first doris table"
DISTRIBUTED BY HASH(k1) BUCKETS 32;
创建一个 olap 表,使用 Hash 分桶,使用列存,相同key的记录进行覆盖,
设置初始存储介质和冷却时间
CREATE TABLE example_db.table_hash
(
k1 BIGINT,
k2 LARGEINT,
v1 VARCHAR(2048) REPLACE,
v2 SMALLINT SUM DEFAULT "10"
)
ENGINE=olap
AGGREGATE KEY(k1, k2)
DISTRIBUTED BY HASH (k1, k2) BUCKETS 32
PROPERTIES(
"storage_medium" = "SSD",
"storage_cooldown_time" = "2015-06-04 00:00:00"
);
创建一个 olap 表,使用 Range 分区,使用Hash分桶,默认使用列存,
相同key的记录同时存在,设置初始存储介质和冷却时间
1)LESS THAN
CREATE TABLE example_db.table_range ( k1 DATE, k2 INT, k3 SMALLINT, v1 VARCHAR(2048), v2 DATETIME DEFAULT "2014-02-04 15:36:00" ) ENGINE=olap DUPLICATE KEY(k1, k2, k3) PARTITION BY RANGE (k1) ( PARTITION p1 VALUES LESS THAN ("2014-01-01"), PARTITION p2 VALUES LESS THAN ("2014-06-01"), PARTITION p3 VALUES LESS THAN ("2014-12-01") ) DISTRIBUTED BY HASH(k2) BUCKETS 32 PROPERTIES( "storage_medium" = "SSD", "storage_cooldown_time" = "2015-06-04 00:00:00" );
说明:
这个语句会将数据划分成如下3个分区:
( { MIN }, {"2014-01-01"} )
[ {"2014-01-01"}, {"2014-06-01"} )
[ {"2014-06-01"}, {"2014-12-01"} )
不在这些分区范围内的数据将视为非法数据被过滤
CREATE TABLE table_range ( k1 DATE, k2 INT, k3 SMALLINT, v1 VARCHAR(2048), v2 DATETIME DEFAULT "2014-02-04 15:36:00" ) ENGINE=olap DUPLICATE KEY(k1, k2, k3) PARTITION BY RANGE (k1, k2, k3) ( PARTITION p1 VALUES [("2014-01-01", "10", "200"), ("2014-01-01", "20", "300")), PARTITION p2 VALUES [("2014-06-01", "100", "200"), ("2014-07-01", "100", "300")) ) DISTRIBUTED BY HASH(k2) BUCKETS 32 PROPERTIES( "storage_medium" = "SSD" );
创建一个 olap 表,使用 List 分区,使用Hash分桶,默认使用列存,
相同key的记录同时存在,设置初始存储介质和冷却时间
1)单列分区
CREATE TABLE example_db.table_list ( k1 INT, k2 VARCHAR(128), k3 SMALLINT, v1 VARCHAR(2048), v2 DATETIME DEFAULT "2014-02-04 15:36:00" ) ENGINE=olap DUPLICATE KEY(k1, k2, k3) PARTITION BY LIST (k1) ( PARTITION p1 VALUES IN ("1", "2", "3"), PARTITION p2 VALUES IN ("4", "5", "6"), PARTITION p3 VALUES IN ("7", "8", "9") ) DISTRIBUTED BY HASH(k2) BUCKETS 32 PROPERTIES( "storage_medium" = "SSD", "storage_cooldown_time" = "2022-06-04 00:00:00" );
说明:
这个语句会将数据划分成如下3个分区:
("1", "2", "3")
("4", "5", "6")
("7", "8", "9")
不在这些分区枚举值内的数据将视为非法数据被过滤
CREATE TABLE example_db.table_list ( k1 INT, k2 VARCHAR(128), k3 SMALLINT, v1 VARCHAR(2048), v2 DATETIME DEFAULT "2014-02-04 15:36:00" ) ENGINE=olap DUPLICATE KEY(k1, k2, k3) PARTITION BY LIST (k1, k2) ( PARTITION p1 VALUES IN (("1","beijing"), ("1", "shanghai")), PARTITION p2 VALUES IN (("2","beijing"), ("2", "shanghai")), PARTITION p3 VALUES IN (("3","beijing"), ("3", "shanghai")) ) DISTRIBUTED BY HASH(k2) BUCKETS 32 PROPERTIES( "storage_medium" = "SSD", "storage_cooldown_time" = "2022-06-04 00:00:00" );
说明:
这个语句会将数据划分成如下3个分区:
(("1","beijing"), ("1", "shanghai"))
(("2","beijing"), ("2", "shanghai"))
(("3","beijing"), ("3", "shanghai"))
不在这些分区枚举值内的数据将视为非法数据被过滤
创建一个 mysql 表
5.1 直接通过外表信息创建mysql表
CREATE EXTERNAL TABLE example_db.table_mysql ( k1 DATE, k2 INT, k3 SMALLINT, k4 VARCHAR(2048), k5 DATETIME ) ENGINE=mysql PROPERTIES ( "host" = "127.0.0.1", "port" = "8239", "user" = "mysql_user", "password" = "mysql_passwd", "database" = "mysql_db_test", "table" = "mysql_table_test" )
5.2 通过External Catalog Resource创建mysql表
CREATE EXTERNAL RESOURCE "mysql_resource"
PROPERTIES
(
"type" = "odbc_catalog",
"user" = "mysql_user",
"password" = "mysql_passwd",
"host" = "127.0.0.1",
"port" = "8239"
);
CREATE EXTERNAL TABLE example_db.table_mysql
(
k1 DATE,
k2 INT,
k3 SMALLINT,
k4 VARCHAR(2048),
k5 DATETIME
)
ENGINE=mysql
PROPERTIES
(
"odbc_catalog_resource" = "mysql_resource",
"database" = "mysql_db_test",
"table" = "mysql_table_test"
)
CREATE EXTERNAL TABLE example_db.table_broker ( k1 DATE, k2 INT, k3 SMALLINT, k4 VARCHAR(2048), k5 DATETIME ) ENGINE=broker PROPERTIES ( "broker_name" = "hdfs", "path" = "hdfs://hdfs_host:hdfs_port/data1,hdfs://hdfs_host:hdfs_port/data2,hdfs://hdfs_host:hdfs_port/data3%2c4", "column_separator" = "|", "line_delimiter" = "\n" ) BROKER PROPERTIES ( "username" = "hdfs_user", "password" = "hdfs_password" )
CREATE TABLE example_db.example_table
(
k1 TINYINT,
k2 DECIMAL(10, 2) DEFAULT "10.5",
v1 HLL HLL_UNION,
v2 HLL HLL_UNION
)
ENGINE=olap
AGGREGATE KEY(k1, k2)
DISTRIBUTED BY HASH(k1) BUCKETS 32;
CREATE TABLE example_db.example_table
(
k1 TINYINT,
k2 DECIMAL(10, 2) DEFAULT "10.5",
v1 BITMAP BITMAP_UNION,
v2 BITMAP BITMAP_UNION
)
ENGINE=olap
AGGREGATE KEY(k1, k2)
DISTRIBUTED BY HASH(k1) BUCKETS 32;
CREATE TABLE `t1` ( `id` int(11) COMMENT "", `value` varchar(8) COMMENT "" ) ENGINE=OLAP DUPLICATE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 10 PROPERTIES ( "colocate_with" = "t1" ); CREATE TABLE `t2` ( `id` int(11) COMMENT "", `value` varchar(8) COMMENT "" ) ENGINE=OLAP DUPLICATE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 10 PROPERTIES ( "colocate_with" = "t1" );
CREATE EXTERNAL TABLE example_db.table_broker (
k1 DATE
)
ENGINE=broker
PROPERTIES (
"broker_name" = "bos",
"path" = "bos://my_bucket/input/file",
)
BROKER PROPERTIES (
"bos_endpoint" = "http://bj.bcebos.com",
"bos_accesskey" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
"bos_secret_accesskey"="yyyyyyyyyyyyyyyyyyyy"
)
CREATE TABLE example_db.table_hash
(
k1 TINYINT,
k2 DECIMAL(10, 2) DEFAULT "10.5",
v1 CHAR(10) REPLACE,
v2 INT SUM,
INDEX k1_idx (k1) USING BITMAP COMMENT 'xxxxxx'
)
ENGINE=olap
AGGREGATE KEY(k1, k2)
COMMENT "my first doris table"
DISTRIBUTED BY HASH(k1) BUCKETS 32;
2020-01-08
,则会创建分区名为p20200108
, p20200109
, p20200110
, p20200111
的分区. 分区范围分别为:[types: [DATE]; keys: [2020-01-08]; ‥types: [DATE]; keys: [2020-01-09]; )
[types: [DATE]; keys: [2020-01-09]; ‥types: [DATE]; keys: [2020-01-10]; )
[types: [DATE]; keys: [2020-01-10]; ‥types: [DATE]; keys: [2020-01-11]; )
[types: [DATE]; keys: [2020-01-11]; ‥types: [DATE]; keys: [2020-01-12]; )
CREATE TABLE example_db.dynamic_partition ( k1 DATE, k2 INT, k3 SMALLINT, v1 VARCHAR(2048), v2 DATETIME DEFAULT "2014-02-04 15:36:00" ) ENGINE=olap DUPLICATE KEY(k1, k2, k3) PARTITION BY RANGE (k1) () DISTRIBUTED BY HASH(k2) BUCKETS 32 PROPERTIES( "storage_medium" = "SSD", "dynamic_partition.time_unit" = "DAY", "dynamic_partition.start" = "-3", "dynamic_partition.end" = "3", "dynamic_partition.prefix" = "p", "dynamic_partition.buckets" = "32" );
CREATE TABLE example_db.rollup_index_table ( event_day DATE, siteid INT DEFAULT '10', citycode SMALLINT, username VARCHAR(32) DEFAULT '', pv BIGINT SUM DEFAULT '0' ) AGGREGATE KEY(event_day, siteid, citycode, username) DISTRIBUTED BY HASH(siteid) BUCKETS 10 rollup ( r1(event_day,siteid), r2(event_day,citycode), r3(event_day) ) PROPERTIES("replication_num" = "3");
CREATE TABLE example_db.table_hash
(
k1 TINYINT,
k2 DECIMAL(10, 2) DEFAULT "10.5",
v1 CHAR(10) REPLACE,
v2 INT SUM,
INDEX k1_idx (k1) USING BITMAP COMMENT 'xxxxxx'
)
ENGINE=olap
AGGREGATE KEY(k1, k2)
COMMENT "my first doris table"
DISTRIBUTED BY HASH(k1) BUCKETS 32
PROPERTIES ("in_memory"="true");
CREATE TABLE example_db.table_hive
(
k1 TINYINT,
k2 VARCHAR(50),
v INT
)
ENGINE=hive
PROPERTIES
(
"database" = "hive_db_name",
"table" = "hive_table_name",
"hive.metastore.uris" = "thrift://127.0.0.1:9083"
);
CREATE TABLE example_db.table_hash ( k1 TINYINT, k2 DECIMAL(10, 2) DEFAULT "10.5" ) DISTRIBUTED BY HASH(k1) BUCKETS 32 PROPERTIES ( "replication_allocation"="tag.location.group_a:1, tag.location.group_b:2" ); CREATE TABLE example_db.dynamic_partition ( k1 DATE, k2 INT, k3 SMALLINT, v1 VARCHAR(2048), v2 DATETIME DEFAULT "2014-02-04 15:36:00" ) PARTITION BY RANGE (k1) () DISTRIBUTED BY HASH(k2) BUCKETS 32 PROPERTIES( "dynamic_partition.time_unit" = "DAY", "dynamic_partition.start" = "-3", "dynamic_partition.end" = "3", "dynamic_partition.prefix" = "p", "dynamic_partition.buckets" = "32", "dynamic_partition."replication_allocation" = "tag.location.group_a:3" );
Doris 的建表是一个同步命令,命令返回成功,即表示建表成功。
Doris 支持支持单分区和复合分区两种建表方式。
1)复合分区:既有分区也有分桶
第一级称为 Partition,即分区。用户可以指定某一维度列作为分区列(当前只支持整型和时间类型的列),并指定每个分区的取值范围。
第二级称为 Distribution,即分桶。用户可以指定一个或多个维度列以及桶数对数据进行 HASH 分布。
2)单分区:只做 HASH 分布,即只分桶。
CREATE TABLE IF NOT EXISTS example_db.expamle_range_tbl ( `user_id` LARGEINT NOT NULL COMMENT "用户 id", `date` DATE NOT NULL COMMENT "数据灌入日期时间", `timestamp` DATETIME NOT NULL COMMENT "数据灌入的时间戳", `city` VARCHAR(20) COMMENT "用户所在城市", `age` SMALLINT COMMENT "用户年龄", `sex` TINYINT COMMENT "用户性别", `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间", `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费", `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间", `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间" ) ENGINE=olap AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`) PARTITION BY RANGE(`date`) ( PARTITION `p201701` VALUES LESS THAN ("2017-02-01"), PARTITION `p201702` VALUES LESS THAN ("2017-03-01"), PARTITION `p201703` VALUES LESS THAN ("2017-04-01") ) DISTRIBUTED BY HASH(`user_id`) BUCKETS 16 PROPERTIES ( "replication_num" = "3", "storage_medium" = "SSD", "storage_cooldown_time" = "2018-01-01 12:00:00" );
CREATE TABLE IF NOT EXISTS example_db.expamle_list_tbl ( `user_id` LARGEINT NOT NULL COMMENT "用户 id", `date` DATE NOT NULL COMMENT "数据灌入日期时间", `timestamp` DATETIME NOT NULL COMMENT "数据灌入的时间戳", `city` VARCHAR(20) COMMENT "用户所在城市", `age` SMALLINT COMMENT "用户年龄", `sex` TINYINT COMMENT "用户性别", `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间", `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费", `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间", `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时 间" ) ENGINE=olap AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`) PARTITION BY LIST(`city`) ( PARTITION `p_cn` VALUES IN ("Beijing", "Shanghai", "Hong Kong"), PARTITION `p_usa` VALUES IN ("New York", "San Francisco"), PARTITION `p_jp` VALUES IN ("Tokyo") ) DISTRIBUTED BY HASH(`user_id`) BUCKETS 16 PROPERTIES ( "replication_num" = "3", "storage_medium" = "SSD", "storage_cooldown_time" = "2018-01-01 12:00:00" );
以 AGGREGATE KEY 数据模型为例进行说明。更多数据模型参阅 Doris 数据模型。
列的基本类型,可以通过在 mysql-client 中执行 HELP CREATE TABLE; 查看。
AGGREGATE KEY 数据模型中,所有没有指定聚合方式(SUM、REPLACE、MAX、MIN)的列视为 Key 列。而其余则为 Value 列。
定义列时,可参照如下建议:
➢ Key 列必须在所有 Value 列之前
➢ 尽量选择整型类型。因为整型类型的计算和查找比较效率远高于字符串。
➢ 对于不同长度的整型类型的选择原则,遵循够用即可。
➢ 对于 VARCHAR 和 STRING 类型的长度,遵循 够用即可
➢ 所有列的总字节长度(包括 Key 和 Value)不能超过 100KB。
Doris 支持两层的数据划分。第一层是 Partition,支持 Range 和 List 的划分方式。第二
层是 Bucket(Tablet),仅支持 Hash 的划分方式。也可以仅使用一层分区。使用一层分区时,只支持 Bucket 划分。
也可以仅使用一层分区。使用一层分区时,只支持 Bucket 划分。
➢ Partition 列可以指定一列或多列。分区类必须为 KEY 列。多列分区的使用方式在后面介绍。
➢ 不论分区列是什么类型,在写分区值时,都需要加双引号。
➢ 分区数量理论上没有上限。
➢ 当不使用 Partition 建表时,系统会自动生成一个和表名同名的,全值范围的Partition。该 Partition 对用户不可见,并且不可删改。
1) Range 分区
分区列通常为时间列,以方便的管理新旧数据。不可添加范围重叠的分区。
Partition 指定范围的方式
⚫ VALUES LESS THAN (…) 仅指定上界,系统会将前一个分区的上界作为该分区的
下界,生成一个左闭右开的区间。分区的删除不会改变已存在分区的范围。删除分区可能出现空洞。
VALUES […) 指定同时指定上下界,生成一个左闭右开的区间。
通过 VALUES […) 同时指定上下界比较容易理解。这里举例说明,当使用 VALUES LESS THAN (…) 语句进行分区的增删操作时,分区范围的变化情况:
(1)如上 5.3.1 Range Partition示例,当建表完成后,会自动生成如下 3 个分区:
p201701: [MIN_VALUE, 2017-02-01)
p201702: [2017-02-01, 2017-03-01)
p201703: [2017-03-01, 2017-04-01)
(2)增加一个分区 p201705 VALUES LESS THAN (“2017-06-01”),分区结果如下:
p201701: [MIN_VALUE, 2017-02-01)
p201702: [2017-02-01, 2017-03-01)
p201703: [2017-03-01, 2017-04-01)
p201705: [2017-04-01, 2017-06-01)
(3)此时删除分区 p201703,则分区结果如下:
p201701: [MIN_VALUE, 2017-02-01)
p201702: [2017-02-01, 2017-03-01)
p201705: [2017-04-01, 2017-06-01)
注意到 p201702 和 p201705 的分区范围并没有发生变化,而这两个分区之间,出现了
一个空洞:[2017-03-01, 2017-04-01)。即如果导入的数据范围在这个空洞范围内,是无法导
入的。
(4)继续删除分区 p201702,分区结果如下:
p201701: [MIN_VALUE, 2017-02-01)
p201705: [2017-04-01, 2017-06-01)
空洞范围变为:[2017-02-01, 2017-04-01)
(5)现在增加一个分区 p201702new VALUES LESS THAN (“2017-03-01”),分区结果
如下:
p201701: [MIN_VALUE, 2017-02-01)
p201702new: [2017-02-01, 2017-03-01)
p201705: [2017-04-01, 2017-06-01)
可以看到空洞范围缩小为:[2017-03-01, 2017-04-01)
(6)现在删除分区 p201701,并添加分区 p201612 VALUES LESS THAN (“2017-01-01”),
分区结果如下:
p201612: [MIN_VALUE, 2017-01-01)
p201702new: [2017-02-01, 2017-03-01)
p201705: [2017-04-01, 2017-06-01)
即出现了一个新的空洞:[2017-01-01, 2017-02-01)
2)List 分区
分区列支 持 BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, LARGEINT, DATE,DATETIME, CHAR, VARCHAR 数据类型,分区值为枚举值。只有当数据为目标分区枚举值其中之一时,才可以命中分区。不可添加范围重叠的分区。
Partition 支持通过 VALUES IN (…) 来指定每个分区包含的枚举值。下面通过示例说明,
进行分区的增删操作时,分区的变化。
(1)如上 example_list_tbl 示例,当建表完成后,会自动生成如下 3 个分区:
p_cn: ("Beijing", "Shanghai", "Hong Kong")
p_usa: ("New York", "San Francisco")
p_jp: ("Tokyo")
(2)增加一个分区 p_uk VALUES IN (“London”),分区结果如下:
p_cn: ("Beijing", "Shanghai", "Hong Kong")
p_usa: ("New York", "San Francisco")
p_jp: ("Tokyo")
p_uk: ("London")
(3)删除分区 p_jp,分区结果如下:
p_cn: ("Beijing", "Shanghai", "Hong Kong")
p_usa: ("New York", "San Francisco")
p_uk: ("London")
(1)如果使用了 Partition,则 DISTRIBUTED … 语句描述的是数据在各个分区内的划分规则。如果不使用 Partition,则描述的是对整个表的数据的划分规则。
(2)分桶列可以是多列,但必须为 Key 列。分桶列可以和 Partition 列相同或不同。
(3)分桶列的选择,是在 查询吞吐 和 查询并发 之间的一种权衡:
① 如果选择多个分桶列,则数据分布更均匀。
如果一个查询条件不包含所有分桶列的等值条件,那么该查询会触发所有分桶同时扫描,这样查询的吞吐会增加,单个查询的延迟随之降低。这个方式适合大吞吐低并发的查询场景。
② 如果仅选择一个或少数分桶列,则对应的点查询可以仅触发一个分桶扫描。此时,当多个点查询并发时,这些查询有较大的概率分别触发不同的分桶扫描,各个查询之间的 IO 影响较小(尤其当不同桶分布在不同磁盘上时),所以这种方式适合高并发的点查询场景。
(4)分桶的数量理论上没有上限。
以下场景推荐使用复合分区
(1)有时间维度或类似带有有序值的维度,可以以这类维度列作为分区列。分区粒度
可以根据导入频次、分区数据量等进行评估。
(2)历史数据删除需求:如有删除历史数据的需求(比如仅保留最近 N 天的数据)。
使用复合分区,可以通过删除历史分区来达到目的。也可以通过在指定分区内发送 DELETE
语句进行数据删除。
(3)解决数据倾斜问题:每个分区可以单独指定分桶数量。如按天分区,当每天的数
据量差异很大时,可以通过指定分区的分桶数,合理划分不同分区的数据,分桶列建议选择
区分度大的列。
Doris 支持指定多列作为分区列,示例如下:
1)Range 分区
PARTITION BY RANGE(`date`, `id`)
(
PARTITION `p201701_1000` VALUES LESS THAN ("2017-02-01", "1000"),
PARTITION `p201702_2000` VALUES LESS THAN ("2017-03-01", "2000"),
PARTITION `p201703_all` VALUES LESS THAN ("2017-04-01")
)
指定 `date`(DATE 类型) 和 `id`(INT 类型) 作为分区列。以上示例最终得到的分区如下:
p201701_1000: [(MIN_VALUE, MIN_VALUE), ("2017-02-01", "1000") )
p201702_2000: [("2017-02-01", "1000"), ("2017-03-01", "2000") )
p201703_all: [("2017-03-01", "2000"), ("2017-04-01", MIN_VALUE))
注意,最后一个分区用户缺省只指定了 date
列的分区值,所以 id
列的分区值会默认填充 MIN_VALUE
。当用户插入数据时,分区列值会按照顺序依次比较,最终得到对应的分区。举例如下:
数据 --> 分区
2017-01-01, 200 --> p201701_1000
2017-01-01, 2000 --> p201701_1000
2017-02-01, 100 --> p201701_1000
2017-02-01, 2000 --> p201702_2000
2017-02-15, 5000 --> p201702_2000
2017-03-01, 2000 --> p201703_all
2017-03-10, 1 --> p201703_all
2017-04-01, 1000 --> 无法导入
2017-05-01, 1000 --> 无法导入
2)List 分区
PARTITION BY LIST(`id`, `city`)
(
PARTITION `p1_city` VALUES IN (("1", "Beijing"), ("1",
"Shanghai")),
PARTITION `p2_city` VALUES IN (("2", "Beijing"), ("2",
"Shanghai")),
PARTITION `p3_city` VALUES IN (("3", "Beijing"), ("3",
"Shanghai"))
)
指定 id
(INT 类型) 和 city
(VARCHAR 类型) 作为分区列。最终得到的分区如下:
p1_city: [("1", "Beijing"), ("1", "Shanghai")]
p2_city: [("2", "Beijing"), ("2", "Shanghai")]
p3_city: [("3", "Beijing"), ("3", "Shanghai")]
当用户插入数据时,分区列值会按照顺序依次比较,最终得到对应的分区。举例如下:数据 —> 分区
1, Beijing ---> p1_city
1, Shanghai ---> p1_city
2, Shanghai ---> p2_city
3, Beijing ---> p3_city
1, Tianjin ---> 无法导入
4, Beijing ---> 无法导入
在建表语句的最后 PROPERTIES 中,可以指定以下两个参数:
每个 Tablet 的副本数量。默认为 3,建议保持默认即可。在建表语句中,所有 Partition中的 Tablet 副本数量统一指定。而在增加新分区时,可以单独指定新分区中 Tablet 的副本数量。
副本数量可以在运行时修改。强烈建议保持奇数。
最大副本数量取决于集群中独立 IP 的数量(注意不是 BE 数量)。Doris 中副本分布的原则是,不允许同一个Tablet 的副本分布在同一台物理机上,而识别物理机即通过 IP。所以,即使在同一台物理机上部署了 3 个或更多 BE 实例,如果这些 BE 的 IP 相同,则依然只能设置副本数为 1。
对于一些小,并且更新不频繁的维度表,可以考虑设置更多的副本数。这样在 Join 查询时,可以有更大的概率进行本地数据 Join。
BE 的数据存储目录可以显式的指定为 SSD 或者 HDD(通过 .SSD 或者 .HDD 后缀区分)。建表时,可以统一指定所有 Partition 初始存储的介质。注意,后缀作用是显式指定磁盘介质,而不会检查是否与实际介质类型相符。
默认初始存储介质可通过 fe 的配置文件 fe.conf 中指定 default_storage_medium=xxx,
如果没有指定,则默认为 HDD。如果指定为 SSD,则数据初始存放在 SSD 上。
如果没有指定 storage_cooldown_time,则默认 30 天后,数据会从 SSD 自动迁移到 HDD上。如果指定了 storage_cooldown_time,则在到达 storage_cooldown_time 时间后,数据才会迁移。
注意,当指定 storage_medium 时,如果 FE 参数 enable_strict_storage_medium_check 为False 该参数只是一个“尽力而为”的设置。即使集群内没有设置 SSD 存储介质,也不会报错,而是自动存储在可用的数据目录中。 同样,如果 SSD 介质不可访问、空间不足,都可能导致数据初始直接存储在其他可用介质上。而数据到期迁移到 HDD 时,如果 HDD 介质不可访问 、 空间不足 , 也 可能迁移失败 ( 但是会不断尝试 ) 。 如 果 FE参数enable_strict_storage_medium_check 为 True 则当集群内没有设置 SSD 存储介质时,会报错Failed to find enough host in all backends with storage medium is SSD。
本示例中,ENGINE 的类型是 olap,即默认的 ENGINE 类型。在 Doris 中,只有这个ENGINE 类型是由 Doris 负责数据管理和存储的。其他 ENGINE 类型,如 mysql、broker、es 等等,本质上只是对外部其他数据库或系统中的表的映射,以保证 Doris 可以读取这些数据。而 Doris 本身并不创建、管理和存储任何非 olap ENGINE 类型的表和数据。
Doris 的数据模型主要分为 3 类:Aggregate、Uniq、Duplicate
表中的列按照是否设置了 AggregationType,分为 Key(维度列)和 Value(指标列)。
没有设置 AggregationType 的称为 Key,设置了 AggregationType 的称为 Value。
当我们导入数据时,对于 Key 列相同的行会聚合成一行,而 Value 列会按照设置的AggregationType 进行聚合。
AggregationType 目前有以下四种聚合方式:
➢ SUM:求和,多行的 Value 进行累加。
➢ REPLACE:替代,下一批数据中的 Value 会替换之前导入过的行中的 Value。
REPLACE_IF_NOT_NULL :当遇到 null 值则不更新。
➢ MAX:保留最大值。
➢ MIN:保留最小值。
数据的聚合,在 Doris 中有如下三个阶段发生:
(1)每一批次数据导入的 ETL 阶段。该阶段会在每一批次导入的数据内部进行聚合。
(2)底层 BE 进行数据 Compaction 的阶段。该阶段,BE 会对已导入的不同批次的数据进行进一步的聚合。
(3)数据查询阶段。在数据查询时,对于查询涉及到的数据,会进行对应的聚合。数据在不同时间,可能聚合的程度不一致。比如一批数据刚导入时,可能还未与之前已存在的数据进行聚合。但是对于用户而言,用户只能查询到聚合后的数据。即不同的聚合程度对于用户查询而言是透明的。用户需始终认为数据以最终的完成的聚合程度存在,而不应假设某些聚合还未发生。(可参阅聚合模型的局限性一节获得更多详情。)
1)建表 CREATE TABLE IF NOT EXISTS test_db.example_site_visit ( `user_id` LARGEINT NOT NULL COMMENT "用户 id", `date` DATE NOT NULL COMMENT "数据灌入日期时间", `city` VARCHAR(20) COMMENT "用户所在城市", `age` SMALLINT COMMENT "用户年龄", `sex` TINYINT COMMENT "用户性别", `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间", `last_visit_date_not_null` DATETIME REPLACE_IF_NOT_NULL DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间", `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费", `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间", `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间" ) AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`) DISTRIBUTED BY HASH(`user_id`) BUCKETS 10; 2)插入数据 insert into test_db.example_site_visit values (10000,'2017-10-01','北京',20,0,'2017-10-01 06:00:00','2017-10-0106:00:00',20,10,10), (10000,'2017-10-01','北京',20,0,'2017-10-01 07:00:00','2017-10-0107:00:00',15,2,2), (10001,'2017-10-01','北京',30,1,'2017-10-01 17:05:45','2017-10-0107:00:00',2,22,22), (10002,'2017-10-02',' 上 海 ',20,1,'2017-10-0212:59:12',null,200,5,5), (10003,'2017-10-02','广州',32,0,'2017-10-02 11:20:00','2017-10-0211:20:00',30,11,11), (10004,'2017-10-01','深圳',35,0,'2017-10-01 10:00:15','2017-10-0110:00:15',100,3,3), (10004,'2017-10-03','深圳',35,0,'2017-10-03 10:20:22','2017-10-03 10:20:22',11,6,6);
注意:Insert into 单条数据这种操作在 Doris 里只能演示不能在生产使用,会引发写阻塞。
1)建表
CREATE TABLE IF NOT EXISTS test_db.example_site_visit2 ( `user_id` LARGEINT NOT NULL COMMENT "用户 id", `date` DATE NOT NULL COMMENT "数据灌入日期时间", `timestamp` DATETIME COMMENT "数据灌入时间,精确到秒", `city` VARCHAR(20) COMMENT "用户所在城市", `age` SMALLINT COMMENT "用户年龄", `sex` TINYINT COMMENT "用户性别", `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间", `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费", `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间", `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时 间" ) AGGREGATE KEY(`user_id`, `date`, `timestamp`, `city`, `age`, `sex`) DISTRIBUTED BY HASH(`user_id`) BUCKETS 10;
2)插入数据
insert into test_db.example_site_visit2 values(10000,'2017-10-
01','2017-10-01 08:00:05',' 北 京 ',20,0,'2017-10-01
06:00:00',20,10,10),\
(10000,'2017-10-01','2017-10-01 09:00:05','北京',20,0,'2017-10-01
07:00:00',15,2,2),\
(10001,'2017-10-01','2017-10-01 18:12:10','北京',30,1,'2017-10-01
17:05:45',2,22,22),\
(10002,'2017-10-02','2017-10-02 13:10:00','上海',20,1,'2017-10-02
12:59:12',200,5,5),\
(10003,'2017-10-02','2017-10-02 13:15:00','广州',32,0,'2017-10-02
11:20:00',30,11,11),\
(10004,'2017-10-01','2017-10-01 12:12:48','深圳',35,0,'2017-10-01
10:00:15',100,3,3),\
(10004,'2017-10-03','2017-10-03 12:38:20','深圳',35,0,'2017-10-03
10:20:22',11,6,6);
3)查看表
select * from test_db.example_site_visit2;
存储的数据,和导入数据完全一样,没有发生任何聚合。这是因为,这批数据中,因为加入了 timestamp 列,所有行的 Key 都不完全相同。也就是说,只要保证导入的数据中,每一行的 Key 都不完全相同,那么即使在聚合模型下,Doris 也可以保存完整的明细数据。
1)往实例一中继续插入数据
insert into test_db.example_site_visit values(10004,'2017-10-03','深圳',35,0,'2017-10-03 11:22:00',null,44,19,19),
(10005,'2017-10-03','长沙',29,1,'2017-10-03 18:11:02','2017-10-03 18:11:02',3,1,1);
2)查看表
select * from test_db.example_site_visit;
可以看到,用户 10004 的已有数据和新导入的数据发生了聚合。同时新增了 10005 用户的数据。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。