赞
踩
所谓拉链,就是记录历史
从开始,一直到当前状态
的所有变化的信息。当天的最新数据
以及之前的历史数据
。数据量很大
且业务系统不会长期保留历史数据,需要在大数据平台保存update更新操作
需要查看某一个时间点或者时间段的历史快照信息
变化的比例和频率很小
一些维度表的数据不是静态的,而是会随着时间而缓慢地变化
(这里的缓慢是相对事实表而言
,事实表数据变化的速度比维度表快)编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|---|
9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 |
用户体验不好
这种方法有个前提,用户不关心这个数据的变化。
典型代表就是拉链表。
修改前:
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|---|
9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 |
修改后:
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|---|
9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 |
9527 | 114 | 张三 | 1992-09-08 | 北京市海淀区 |
用户改多少列我要增加多少列??
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|---|
9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 |
编号 | 用户ID | 用户名 | 出生日期 | 出生日期2 | 住址 | 现住址 |
---|---|---|---|---|---|---|
9527 | 114 | 张三 | 1988-09-08 | 1992-09-08 | 北京市朝阳区 | 北京市海淀区 |
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|---|
9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 |
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|---|
9527 | 114 | 张三 | 1992-09-08 | 北京市海淀区 |
编号 | 用户ID | 用户名 | 出生日期 | 住址 |
---|---|---|---|---|
9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 |
编号 | 用户ID | 用户名 | 出生日期 | 住址 | 更新时间 |
---|---|---|---|---|---|
9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 | 2023-01-01 10:00:00 |
9527 | 114 | 张三 | 1992-09-08 | 北京市朝阳区 | 2023-05-02 10:00:00 |
9527 | 114 | 张三 | 1992-09-08 | 北京市海淀区 | 2023-05-03 10:00:00 |
id | user_id | user_name | date_of_birth | address_of_birth | update_time | start_date | end_date |
---|---|---|---|---|---|---|---|
9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 | 2023-01-01 10:00:00 | 2023-05-01 | 2023-05-02 |
9527 | 114 | 张三 | 1992-09-08 | 北京市朝阳区 | 2023-05-02 10:00:00 | 2023-05-02 | 2023-05-03 |
9527 | 114 | 张三 | 1992-09-08 | 北京市海淀区 | 2023-05-03 10:00:00 | 2023-05-03 | 9999-12-31 |
这里我们讨论一个简单的优化
start_date,end_date,dp
)start_date
”用来备注该记录的起始时间
不代表该记录的创建时间,因此该字段为非业务时间。
active
)
expired
)history
归档step1:准备数仓的表,这里我们采用增量拉链的方式处理
CREATE TABLE tmp.temp_ods_user( `id` int comment 'id', `user_id` int comment '用户id', `user_name` string comment '用户名称', `date_of_birth` string comment '出生日期', `address_of_birth` string comment '出生地址', `update_time` string comment '更新时间' ) comment '测试拉链表,用户信息表ods抽取层' PARTITIONED BY ( `dt` string COMMENT '增量抽取日期') stored as orc ; CREATE TABLE tmp.temp_dw_user_chain( `start_date` string comment '起始日期', `change_code` string comment '字段MD5值', `id` int comment 'id', `user_id` int comment '用户id', `user_name` string comment '用户名称', `date_of_birth` string comment '出生日期', `address_of_birth` string comment '出生地址', `update_time` string comment '更新时间' ) comment '测试拉链表,用户信息表dw明细层拉链处理' PARTITIONED BY ( `status` string COMMENT '状态' ,`end_date` string COMMENT '截止日期' ) stored as orc ;
-- 模拟第一次抽取 ods insert overwrite table tmp.temp_ods_user partition (dt='2023-05-01') select 9527 as id ,114 as user_id ,'张三' as user_name ,'1988-09-08' as date_of_birth ,'北京市朝阳区' as address_of_birth ,'2023-01-01 10:00:00' as update_time ; -- 模拟第一次抽取 拉链 insert overwrite table tmp.temp_dw_user_chain partition (status='expired',end_date='2023-05-01') select case when h.change_code<>c.change_code then h.start_date else e.start_date end as start_date ,case when h.change_code<>c.change_code then h.change_code else e.change_code end as change_code ,case when h.change_code<>c.change_code then h.id else e.id end as id ,case when h.change_code<>c.change_code then h.user_id else e.user_id end as user_id ,case when h.change_code<>c.change_code then h.user_name else e.user_name end as user_name ,case when h.change_code<>c.change_code then h.date_of_birth else e.date_of_birth end as date_of_birth ,case when h.change_code<>c.change_code then h.address_of_birth else e.address_of_birth end as address_of_birth ,case when h.change_code<>c.change_code then h.update_time else e.update_time end as update_time from(select * from tmp.temp_dw_user_chain where status = 'active' and id is not null ) h -- 上次的active数据 full join(select `(dt|rank)?+.+` from (select id,user_id,user_name,date_of_birth,address_of_birth,update_time,change_code ,row_number(id) as rank from (select *,md5(concat_ws('_',id,user_id,user_name,date_of_birth,address_of_birth,update_time)) as change_code from tmp.temp_ods_user where dt = '2023-05-01' and id is not null distribute by id sort by id desc ) x ) t where t.rank = 1 ) c -- 抽取的增量数据 on h.id = c.id full join(select * from tmp.temp_dw_user_chain where status='expired' and end_date='2023-05-01' ) e -- 过期数据 on e.id = c.id where h.id is not null and c.id is not null and (( h.change_code <> c.change_code ) or ( h.change_code = c.change_code and e.id is not null)) ; insert overwrite table tmp.temp_dw_user_chain partition (status='active',end_date='9999-12-31') select if(h.id is null or (c.id is not null and (h.change_code <> c.change_code)),'2023-05-01',h.start_date) as start_date ,case when h.id is null then c.change_code when h.id is not null and c.id is not null and h.change_code <> c.change_code then c.change_code else h.change_code end as change_code ,case when c.id is not null then c.id else h.id end as id ,case when c.id is not null then c.user_id else h.user_id end as user_id ,case when c.id is not null then c.user_name else h.user_name end as user_name ,case when c.id is not null then c.date_of_birth else h.date_of_birth end as date_of_birth ,case when c.id is not null then c.address_of_birth else h.address_of_birth end as address_of_birth ,case when c.id is not null then c.update_time else h.update_time end as update_time from(select * from tmp.temp_dw_user_chain where status = 'active' and id is not null ) h -- 上次的active数据 full join(select `(dt|rank)?+.+` from (select id,user_id,user_name,date_of_birth,address_of_birth,update_time,change_code ,row_number(id) as rank from (select * ,md5(concat_ws('_',id,user_id,user_name,date_of_birth,address_of_birth,update_time)) as change_code from tmp.temp_ods_user where dt = '2023-05-01' and id is not null distribute by id sort by id desc ) x ) t where t.rank = 1 ) c -- 抽取的增量数据 on h.id = c.id ; -- 拉链任务可能在一天内手工跑多次,当天第一次跑拉链任务时,EXPIRED分区中是没有数据的,此时会将被更新的旧数据写入EXPIRED分区中。当天第二次手工重跑拉链任务时,EXPIRED分区中已有数据,会直接将EXPIRED分区数据写入EXPIRED分区。 -- 拉链SQL中EXPIRED分区是必须使用的。拉链任务当天第二次重跑时ACTIVE分区数据已经更新,不是昨天的状态,不使用EXPIRED分区中已有的数据会清空EXPIRED分区数据。
start_date | change_code | id | user_id | user_name | date_of_birth | address_of_birth | update_time | status | end_date |
---|---|---|---|---|---|---|---|---|---|
2023-05-01 | 4ac4beee336ffcc0c6afaab74ed6405f | 9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 | 2023-01-01 10:00:00 | active | 9999-12-31 |
insert overwrite table tmp.temp_ods_user partition (dt='2023-05-02') select 9527 as id ,114 as user_id ,'张三' as user_name ,'1992-09-08' as date_of_birth ,'北京市朝阳区' as address_of_birth ,'2023-05-02 10:00:00' as update_time ; -- 模拟第一次变更抽取 拉链 insert overwrite table tmp.temp_dw_user_chain partition (status='expired',end_date='2023-05-02') select case when h.change_code<>c.change_code then h.start_date else e.start_date end as start_date ,case when h.change_code<>c.change_code then h.change_code else e.change_code end as change_code ,case when h.change_code<>c.change_code then h.id else e.id end as id ,case when h.change_code<>c.change_code then h.user_id else e.user_id end as user_id ,case when h.change_code<>c.change_code then h.user_name else e.user_name end as user_name ,case when h.change_code<>c.change_code then h.date_of_birth else e.date_of_birth end as date_of_birth ,case when h.change_code<>c.change_code then h.address_of_birth else e.address_of_birth end as address_of_birth ,case when h.change_code<>c.change_code then h.update_time else e.update_time end as update_time from(select * from tmp.temp_dw_user_chain where status = 'active' and id is not null ) h -- 上次的active数据 full join(select `(dt|rank)?+.+` from (select id,user_id,user_name,date_of_birth,address_of_birth,update_time,change_code ,row_number(id) as rank from (select *,md5(concat_ws('_',id,user_id,user_name,date_of_birth,address_of_birth,update_time)) as change_code from tmp.temp_ods_user where dt = '2023-05-02' and id is not null distribute by id sort by id desc ) x ) t where t.rank = 1 ) c -- 抽取的增量数据 on h.id = c.id full join(select * from tmp.temp_dw_user_chain where status='expired' and end_date='2023-05-02' ) e -- 过期数据 on e.id = c.id where h.id is not null and c.id is not null and (( h.change_code <> c.change_code ) or ( h.change_code = c.change_code and e.id is not null)) ; insert overwrite table tmp.temp_dw_user_chain partition (status='active',end_date='9999-12-31') select if(h.id is null or (c.id is not null and (h.change_code <> c.change_code)),'2023-05-02',h.start_date) as start_date ,case when h.id is null then c.change_code when h.id is not null and c.id is not null and h.change_code <> c.change_code then c.change_code else h.change_code end as change_code ,case when c.id is not null then c.id else h.id end as id ,case when c.id is not null then c.user_id else h.user_id end as user_id ,case when c.id is not null then c.user_name else h.user_name end as user_name ,case when c.id is not null then c.date_of_birth else h.date_of_birth end as date_of_birth ,case when c.id is not null then c.address_of_birth else h.address_of_birth end as address_of_birth ,case when c.id is not null then c.update_time else h.update_time end as update_time from(select * from tmp.temp_dw_user_chain where status = 'active' and id is not null ) h -- 上次的active数据 full join(select `(dt|rank)?+.+` from (select id,user_id,user_name,date_of_birth,address_of_birth,update_time,change_code ,row_number(id) as rank from (select * ,md5(concat_ws('_',id,user_id,user_name,date_of_birth,address_of_birth,update_time)) as change_code from tmp.temp_ods_user where dt = '2023-05-02' and id is not null distribute by id sort by id desc ) x ) t where t.rank = 1 ) c -- 抽取的增量数据 on h.id = c.id ;
start_date | change_code | id | user_id | user_name | date_of_birth | address_of_birth | update_time | status | end_date |
---|---|---|---|---|---|---|---|---|---|
2023-05-02 | ee3915fc4f4ecad9ea1570e391b4e | 9527 | 114 | 张三 | 1992-09-08 | 北京市朝阳区 | 2023-05-02 10:00:00 | active | 9999-12-31 |
2023-05-01 | 4ac4beee336ffcc0c6afaab74ed6405f | 9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 | 2023-01-01 10:00:00 | expired | 2023-05-02 |
insert overwrite table tmp.temp_ods_user partition (dt='2023-05-03') select 9527 as id ,114 as user_id ,'张三' as user_name ,'1992-09-08' as date_of_birth ,'北京市海淀区' as address_of_birth ,'2023-05-03 10:00:00' as update_time ; -- 模拟第二次变更抽取 拉链 insert overwrite table tmp.temp_dw_user_chain partition (status='expired',end_date='2023-05-03') select case when h.change_code<>c.change_code then h.start_date else e.start_date end as start_date ,case when h.change_code<>c.change_code then h.change_code else e.change_code end as change_code ,case when h.change_code<>c.change_code then h.id else e.id end as id ,case when h.change_code<>c.change_code then h.user_id else e.user_id end as user_id ,case when h.change_code<>c.change_code then h.user_name else e.user_name end as user_name ,case when h.change_code<>c.change_code then h.date_of_birth else e.date_of_birth end as date_of_birth ,case when h.change_code<>c.change_code then h.address_of_birth else e.address_of_birth end as address_of_birth ,case when h.change_code<>c.change_code then h.update_time else e.update_time end as update_time from(select * from tmp.temp_dw_user_chain where status = 'active' and id is not null ) h -- 上次的active数据 full join(select `(dt|rank)?+.+` from (select id,user_id,user_name,date_of_birth,address_of_birth,update_time,change_code ,row_number(id) as rank from (select *,md5(concat_ws('_',id,user_id,user_name,date_of_birth,address_of_birth,update_time)) as change_code from tmp.temp_ods_user where dt = '2023-05-03' and id is not null distribute by id sort by id desc ) x ) t where t.rank = 1 ) c -- 抽取的增量数据 on h.id = c.id full join(select * from tmp.temp_dw_user_chain where status='expired' and end_date='2023-05-03' ) e -- 过期数据 on e.id = c.id where h.id is not null and c.id is not null and (( h.change_code <> c.change_code ) or ( h.change_code = c.change_code and e.id is not null)) ; insert overwrite table tmp.temp_dw_user_chain partition (status='active',end_date='9999-12-31') select if(h.id is null or (c.id is not null and (h.change_code <> c.change_code)),'2023-05-03',h.start_date) as start_date ,case when h.id is null then c.change_code when h.id is not null and c.id is not null and h.change_code <> c.change_code then c.change_code else h.change_code end as change_code ,case when c.id is not null then c.id else h.id end as id ,case when c.id is not null then c.user_id else h.user_id end as user_id ,case when c.id is not null then c.user_name else h.user_name end as user_name ,case when c.id is not null then c.date_of_birth else h.date_of_birth end as date_of_birth ,case when c.id is not null then c.address_of_birth else h.address_of_birth end as address_of_birth ,case when c.id is not null then c.update_time else h.update_time end as update_time from(select * from tmp.temp_dw_user_chain where status = 'active' and id is not null ) h -- 上次的active数据 full join(select `(dt|rank)?+.+` from (select id,user_id,user_name,date_of_birth,address_of_birth,update_time,change_code ,row_number(id) as rank from (select * ,md5(concat_ws('_',id,user_id,user_name,date_of_birth,address_of_birth,update_time)) as change_code from tmp.temp_ods_user where dt = '2023-05-03' and id is not null distribute by id sort by id desc ) x ) t where t.rank = 1 ) c -- 抽取的增量数据 on h.id = c.id ;
start_date | change_code | id | user_id | user_name | date_of_birth | address_of_birth | update_time | status | end_date |
---|---|---|---|---|---|---|---|---|---|
2023-05-01 | 4ac4beee336ffcc0c6afaab74ed6405f | 9527 | 114 | 张三 | 1988-09-08 | 北京市朝阳区 | 2023-01-01 10:00:00 | expired | 2023-05-02 |
2023-05-02 | ee3915fc4f4ecad9ea1570e391b4e | 9527 | 114 | 张三 | 1992-09-08 | 北京市朝阳区 | 2023-05-02 10:00:00 | expired | 2023-05-03 |
2023-05-03 | a68c8f5e5a4982d33ccc9834f6ddb96b | 9527 | 114 | 张三 | 1992-09-08 | 北京市海淀区 | 2023-05-03 10:00:00 | active | 9999-12-31 |
————普通拉链表,无HISTORY分区————
select *
from dw_xxx_chain
where status='active'
————有结转的拉链表,即有HISTORY分区————
select *
from dw_xxx_chain
where status in ( 'active','history')
select *
from dw_xxx_chain
where start_date<=xxx_date
and end_date>xxx_date
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。