赞
踩
最近在使用starRocks,记录一些临时的操作技巧,防止遗忘。
CREATE TABLE IF NOT EXISTS ODS.T_TEST(
pk_day date,
pool_address string,
code string comment '唯一主键',
test1 string,
test2 string,
test3 string,
pk_year varchar(4),
pk_month varchar(7)
)
primary KEY(pk_day,pool_address,code)
PARTITION BY range(pk_day)
(
PARTITION p20230916 VALUES LESS THAN ("2023-09-16"),
PARTITION p20230917 VALUES LESS THAN ("2023-09-17"),
PARTITION p20230918 VALUES LESS THAN ("2023-09-18"),
PARTITION p20230919 VALUES LESS THAN ("2023-09-19"),
PARTITION p20230920 VALUES LESS THAN ("2023-09-20"),
PARTITION p20230921 VALUES LESS THAN ("2023-09-21")
)DISTRIBUTED BY HASH(pool_address)
PROPERTIES(
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-3",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.time_zone" = "UTC",
"dynamic_partition.buckets" = "4"
);
指定了动态分区表,主键的三个字段必须放到前面3个。
PARTITION BY range(pk_day)中间必须创建几个分区。
而且根据日期分区的字段必须设置为Date类型,即使该字段是"2023-09-20"这样的字符串类型。
数据来源Kafka:
# 创建导入脚本
CREATE ROUTINE LOAD ODS.TEST ON TEST
WHERE pk_day is not null and pk_day >= '2023-09-18'
PROPERTIES
(
"desired_concurrent_number"="12",
"format" ="json",
"jsonpaths" ="[\"$.pk_day\", \"$.pool_address\", \"$.code\", \"$.test1\", \"$.test2\", \"$.test3\", \"$.pk_year\",\"$.pk_month\"]"
)
FROM KAFKA
(
"kafka_broker_list" ="localhost:9092",
"kafka_topic" = "ods_test",
"property.kafka_default_offsets" = "OFFSET_BEGINNING",
"property.group.id" = "g1"
);
# 停止脚本
STOP ROUTINE LOAD FOR ODS.TEST;
如果该脚本需要修改,先停止脚本。
如果json字段和表的字段一致,则可以不使用jsonpaths属性一个个字段解析出来。
可以在on后面加筛选条件过滤部分数据。
# 查看在运行的脚本
show routine load;
# 查看分区
show partitions from ODS.TEST;
# 手工添加分区(必须先停止设置动态分区,然后才能添加)
ALTER TABLE ODS.TEST set("dynamic_partition.enable" = "false");
ALTER TABLE ODS.TEST ADD PARTITION p20230917 VALUES LESS THAN ("2023-09-17") DISTRIBUTED BY HASH(pool_address);
ALTER TABLE ODS.TEST set("dynamic_partition.enable" = "true");
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。