用户行为数据仓库
数据分层
在数据仓库中需要对于数据进行分层,原因如下
- 用空间换时间,通过大量的预处理来提升应用系统的用户体验(效率),因此数据仓库会存在大量冗余的数据;硬盘便宜,计算量贵
- 如果不分层的话,如果源业务系统的业务规则发生变化将会影响整个数据清洗过程,工作量巨大
- 不论是数据的异常还是数据的敏感性,使真实数据和统计数据解耦,防止原始数据的变化导致整个集群发生大的变动。
在这个数仓项目中将数据分为四层
- ODS层:原始数据层,存放原始数据,直接加载原始日志,数据,数据保持原样不做更改。
- DWD层:结构和原始数据表保持一致,是对ODS的数据进行清洗。
- DWS层:以DWD为基础,进行轻度汇总。
- ADS层:为各种应用提供数据。
在各个层次的数据前面加入层次名。
Hive仓库
Hive的安装基于mysql,在安装hive前需要安装mysql,
Mysql的安装
配置mysql的HA和主从。
Hive的安装
安装即可使用,需要将hive元数据配置到Mysql
- <?xml version="1.0"?>
- <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
- <configuration>
- <property>
- <name>javax.jdo.option.ConnectionURL</name>
- <value>jdbc:mysql://192.168.6.100:3306/metastore?createDatabaseIfNotExist=true</value>
- <description>JDBC connect string for a JDBC metastore</description>
- </property>
-
- <property>
- <name>javax.jdo.option.ConnectionDriverName</name>
- <value>com.mysql.jdbc.Driver</value>
- <description>Driver class name for a JDBC metastore</description>
- </property>
- <property>
- <name>javax.jdo.option.ConnectionUserName</name>
- <value>root</value>
- <description>username to use against metastore database</description>
- </property>
- <property>
- <name>javax.jdo.option.ConnectionPassword</name>
- <value>123456</value>
- <description>password to use against metastore database</description>
- </property>
- </configuration>
Hive配置Tez,下载Tez的依赖包,将该包上传到HDFS的/tez目录下,mapreduce提供了一个分布式缓存,该缓存会在第一次运行时读入到各自的task之中。hadoop101上解压缩到tar -zxvf apache-tez-0.9.1-bin.tar.gz -C /opt/module。
ODS层的搭建
ODS:原始数据层,用于存放原始数据,直接加载原始日志、数据,数据保持原貌不变
在搭建ODS层时,ODS层的数据不会经常被运算,且数据量是最大的,所以可以配置ODS层为压缩。在这次项目中,配置有LZO压缩。
在配置压缩后建表:
启动日志表
- CREATE EXTERNAL TABLE ods_start_log (`line` string)
- PARTITIONED BY (`dt` string)
- STORED AS
- INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
- OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
- LOCATION '/warehouse/gmall/ods/ods_start_log';
建表后直接从hdfs上加载即可
load data inpath '/origin_data/gmall/log/topic_start/2019-12-21' into table gmall.ods_start_log partition(dt='2019-12-21');
事件日志表
- CREATE EXTERNAL TABLE ods_event_log(`line` string)
- PARTITIONED BY (`dt` string)
- STORED AS
- INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
- OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
- LOCATION '/warehouse/gmall/ods/ods_event_log';
load data inpath '/origin_data/gmall/log/topic_event/2019-12-21' into table gmall.ods_event_log partition(dt='2019-12-21');
DWD层
对ODS层进行了简单的清洗,去除空值,脏数据等操作后可以提取出ODS层
启动日志表
- CREATE EXTERNAL TABLE dwd_start_log(
- `mid_id` string,
- `user_id` string,
- `version_code` string,
- `version_name` string,
- `lang` string,
- `source` string,
- `os` string,
- `area` string,
- `model` string,
- `brand` string,
- `sdk_version` string,
- `gmail` string,
- `height_width` string,
- `app_time` string,
- `network` string,
- `lng` string,
- `lat` string,
- `entry` string,
- `open_ad_type` string,
- `action` string,
- `loading_time` string,
- `detail` string,
- `extend1` string
- )
- PARTITIONED BY (dt string)
- stored as parquet
- location '/warehouse/gmall/dwd/dwd_start_log/'
- TBLPROPERTIES('parquet.compression'='lzo')
建表之后导入数据,启动日志表中的每一行都是一个json,因此此处使用了get_json_object函数,函数是单进单出,第二个参数类似于正则中的匹配项,$
代表整行,.mid
表示一个属性
- insert overwrite table dwd_start_log
- PARTITION (dt='2019-12-21')
- select
- get_json_object(line,'$.mid') mid_id,
- get_json_object(line,'$.uid') user_id,
- get_json_object(line,'$.vc') version_code,
- get_json_object(line,'$.vn') version_name,
- get_json_object(line,'$.l') lang,
- get_json_object(line,'$.sr') source,
- get_json_object(line,'$.os') os,
- get_json_object(line,'$.ar') area,
- get_json_object(line,'$.md') model,
- get_json_object(line,'$.ba') brand,
- get_json_object(line,'$.sv') sdk_version,
- get_json_object(line,'$.g') gmail,
- get_json_object(line,'$.hw') height_width,
- get_json_object(line,'$.t') app_time,
- get_json_object(line,'$.nw') network,
- get_json_object(line,'$.ln') lng,
- get_json_object(line,'$.la') lat,
- get_json_object(line,'$.entry') entry,
- get_json_object(line,'$.open_ad_type') open_ad_type,
- get_json_object(line,'$.action') action,
- get_json_object(line,'$.loading_time') loading_time,
- get_json_object(line,'$.detail') detail,
- get_json_object(line,'$.extend1') extend1
- from ods_start_log
- where dt='2019-12-21';
事件日志表不是一个单纯的json。因此需要自定义UDF函数。
自定义UDF函数
UDF
- /*
- 解析公共字段
- TODO 将传入的line,用“|”切割,取出服务器时间serverTime和json数据
- 根据传入的key,获取对应的value值
- */
UDTF函数:
- /*
- 解析具体事件字段
- TODO 需要继承GenericUDTF。
- */
在本次项目中只分析一个用户的主题
DWS层
DWS层主要的目的是根据DWD层的数据建立一个宽表,使用宽表去分析,避免大量join查询,而宽表的难以修改的缺陷再大数据领域体现不多
此次业务是获取当日、当周、当月活跃设备数,活不活跃只需要看启动日志就行了~
- create external table dws_uv_detail_day
- (
- `mid_id` string COMMENT '设备唯一标识',
- `user_id` string COMMENT '用户标识',
- `version_code` string COMMENT '程序版本号',
- `version_name` string COMMENT '程序版本名',
- `lang` string COMMENT '系统语言',
- `source` string COMMENT '渠道号',
- `os` string COMMENT '安卓系统版本',
- `area` string COMMENT '区域',
- `model` string COMMENT '手机型号',
- `brand` string COMMENT '手机品牌',
- `sdk_version` string COMMENT 'sdkVersion',
- `gmail` string COMMENT 'gmail',
- `height_width` string COMMENT '屏幕宽高',
- `app_time` string COMMENT '客户端日志产生时的时间',
- `network` string COMMENT '网络模式',
- `lng` string COMMENT '经度',
- `lat` string COMMENT '纬度'
- )
- partitioned by(dt string)
- stored as parquet
- location '/warehouse/gmall/dws/dws_uv_detail_day';
- --<同一个mid的多个用户需要去重,但是不能简单的将数据取出,因为DWS层还是需要有数据的一些明细信息,为了进一步处理>
- --<所以在此处我使用collect_set(去重合并)然后通过concat_ws()拼接为字符串>
- insert overwrite table dws_uv_detail_day
- partition(dt='2019-12-21')
- select
- mid_id,
- concat_ws('|', collect_set(user_id)) user_id,
- concat_ws('|', collect_set(version_code)) version_code,
- concat_ws('|', collect_set(version_name)) version_name,
- concat_ws('|', collect_set(lang))lang,
- concat_ws('|', collect_set(source)) source,
- concat_ws('|', collect_set(os)) os,
- concat_ws('|', collect_set(area)) area,
- concat_ws('|', collect_set(model)) model,
- concat_ws('|', collect_set(brand)) brand,
- concat_ws('|', collect_set(sdk_version)) sdk_version,
- concat_ws('|', collect_set(gmail)) gmail,
- concat_ws('|', collect_set(height_width)) height_width,
- concat_ws('|', collect_set(app_time)) app_time,
- concat_ws('|', collect_set(network)) network,
- concat_ws('|', collect_set(lng)) lng,
- concat_ws('|', collect_set(lat)) lat
- from dwd_start_log
- where dt='2019-12-21'
- group by mid_id;
一周用户活跃明细,安装group 问题?如何判断为同一周,项目中使用的方法是创建额外字段,本日期对应的周一和周日日期
- drop table if exists dws_uv_detail_wk;
- create external table dws_uv_detail_wk(
- `mid_id` string COMMENT '设备唯一标识',
- `user_id` string COMMENT '用户标识',
- `version_code` string COMMENT '程序版本号',
- `version_name` string COMMENT '程序版本名',
- `lang` string COMMENT '系统语言',
- `source` string COMMENT '渠道号',
- `os` string COMMENT '安卓系统版本',
- `area` string COMMENT '区域',
- `model` string COMMENT '手机型号',
- `brand` string COMMENT '手机品牌',
- `sdk_version` string COMMENT 'sdkVersion',
- `gmail` string COMMENT 'gmail',
- `height_width` string COMMENT '屏幕宽高',
- `app_time` string COMMENT '客户端日志产生时的时间',
- `network` string COMMENT '网络模式',
- `lng` string COMMENT '经度',
- `lat` string COMMENT '纬度',
- `monday_date` string COMMENT '周一日期',
- `sunday_date` string COMMENT '周日日期'
- ) COMMENT '活跃用户按周明细'
- PARTITIONED BY (`wk_dt` string)
- stored as parquet
- location '/warehouse/gmall/dws/dws_uv_detail_wk/';
按照如下插入数据,下一个周一提前7天就是当前所在周的周一,date_add(next_day('2019-12-21','MO'),-7),周日同理 week字段就可以用concat(date_add( next_day('2019-12-21','MO'),-7), '_' , date_add(next_day('2019-12-21','MO'),-1)
唯一标识
- insert overwrite table dws_uv_detail_wk partition(wk_dt)
- select
- mid_id,
- concat_ws('|', collect_set(user_id)) user_id,
- concat_ws('|', collect_set(version_code)) version_code,
- concat_ws('|', collect_set(version_name)) version_name,
- concat_ws('|', collect_set(lang)) lang,
- concat_ws('|', collect_set(source)) source,
- concat_ws('|', collect_set(os)) os,
- concat_ws('|', collect_set(area)) area,
- concat_ws('|', collect_set(model)) model,
- concat_ws('|', collect_set(brand)) brand,
- concat_ws('|', collect_set(sdk_version)) sdk_version,
- concat_ws('|', collect_set(gmail)) gmail,
- concat_ws('|', collect_set(height_width)) height_width,
- concat_ws('|', collect_set(app_time)) app_time,
- concat_ws('|', collect_set(network)) network,
- concat_ws('|', collect_set(lng)) lng,
- concat_ws('|', collect_set(lat)) lat,
- date_add(next_day('2019-12-21','MO'),-7),
- date_add(next_day('2019-12-21','MO'),-1),
- concat(date_add( next_day('2019-12-21','MO'),-7), '_' , date_add(next_day('2019-12-21','MO'),-1)
- )
- from dws_uv_detail_day
- where dt>=date_add(next_day('2019-12-21','MO'),-7) and dt<=date_add(next_day('2019-12-21','MO'),-1)
- group by mid_id;
ADS
ADS只要继续在DWD层上处理就可以实现我们的需求。
- create external table ads_uv_count(
- `dt` string COMMENT '统计日期',
- `day_count` bigint COMMENT '当日用户数量',
- `wk_count` bigint COMMENT '当周用户数量',
- `mn_count` bigint COMMENT '当月用户数量',
- `is_weekend` string COMMENT 'Y,N是否是周末,用于得到本周最终结果',
- `is_monthend` string COMMENT 'Y,N是否是月末,用于得到本月最终结果'
- ) COMMENT '活跃设备数'
- row format delimited fields terminated by '\t'
- location '/warehouse/gmall/ads/ads_uv_count/';
插入语句
- insert into table ads_uv_count
- select
- '2019-12-21' dt,
- daycount.ct,
- wkcount.ct,
- mncount.ct,
- if(date_add(next_day('2019-12-21','MO'),-1)='2019-12-21','Y','N') ,
- if(last_day('2019-12-21')='2019-12-21','Y','N')
- from
- (
- select
- '2019-12-21' dt,
- count(*) ct
- from dws_uv_detail_day
- where dt='2019-12-21'
- )daycount join
- (
- select
- '2019-12-21' dt,
- count (*) ct
- from dws_uv_detail_wk
- where wk_dt=concat(date_add(next_day('2019-12-21','MO'),-7),'_' ,date_add(next_day('2019-12-21','MO'),-1) )
- ) wkcount on daycount.dt=wkcount.dt
- join
- (
- select
- '2019-12-21' dt,
- count (*) ct
- from dws_uv_detail_mn
- where mn=date_format('2019-12-21','yyyy-MM')
- )mncount on daycount.dt=mncount.dt;