赞
踩
课程目标
drop table if exists itcast_ods.weblog_origin;
create table itcast_ods.weblog_origin(
valid Boolean,
remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_sent string,
http_referer string,
http_user_agent string,
guid string)
partitioned by (dt string)
STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
注意事项
parquet中字段数据类型要与hive表字段类型保持一致!!
drop table if exists itcast_ods.click_pageviews; create table itcast_ods.click_pageviews( session string, remote_addr string, time_local string, request string, visit_step int, page_staylong bigint, http_referer string, http_user_agent string, body_bytes_sent string, status string, guid string) partitioned by (dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
drop table if exists itcast_ods.click_stream_visit;
create table itcast_ods.click_stream_visit(
guid string,
session string,
remote_addr string,
inTime string,
outTime string,
inPage string,
outPage string,
referal string,
pageVisits int)
partitioned by (dt string)
STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
事实表的数据中,有些属性共同组成了一个字段(糅合在一起),比如年月日时分秒构成了时间,当需要根据某一属性进行分组统计的时候,需要截取拼接之类的操作,效率极低。
**宽表:**为了分析方便,可以把事实表中的一个字段切割提取多个属性出来构成新的字段,因为字段变多了,所以称为宽表,原来的成为窄表。
又因为宽表的信息更加清晰明细,所以也可以称之为明细表。
建明细表itcast_dw.fact_weblog_detail
drop table itcast_dw.fact_weblog_detail; create table itcast_dw.fact_weblog_detail( valid string, --有效标识 remote_addr string, --来源IP, guid string, --用户标识 remote_user string, --用户信息 time_local string, --访问完整时间 daystr string, --访问日期 timestr string, --访问时间 month string, --访问月 day string, --访问日 hour string, --访问时 request string, --请求的url status string, --响应码 body_bytes_sent string, --传输字节数 http_referer string, --来源url ref_host string, --来源的host ref_path string, --来源的路径 ref_query string, --来源参数query ref_query_id string, --来源参数query的值 http_user_agent string --客户终端标识 ) partitioned by(dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
通过查询插入数据到明细宽表 fact_weblog_detail中
Insert into table itcast_dw.fact_weblog_detail partition(dt='20191101') select valid, remote_addr, guid, remote_user, time_local, substring(time_local,0,10) as daystr, --年月日 substring(time_local,12) as tmstr, --时分秒 substring(time_local,6,2) as month,--月份 substring(time_local,9,2) as day,--天 substring(time_local,12,2) as hour, --小时 request, status, body_bytes_sent, http_referer, parse_url(regexp_replace(http_referer, '"', ''), 'HOST') as host, --host parse_url(regexp_replace(http_referer, '"', ''), 'PATH') as path, --path parse_url(regexp_replace(http_referer, '"', ''), 'QUERY') as query, -- query parse_url(regexp_replace(http_referer, '"', ''), 'QUERY','id') as query_id, --query:id http_user_agent FROM itcast_ods.weblog_origin where dt ='20191101' ;
指标是网站分析的基础,用来记录和衡量访问者在网站自的各种行为。比如我们经常说的流量就是一个网站指标,它是用来衡量网站获得的访问量。在进行流量分析之前,我们先来了解一些常见的指标。
PageView浏览次数(PV)
Unique Visitor独立访客(UV)
访问次数(VV)
IP
平均访问频度
人均浏览页数(平均访问深度)
平均访问时长
跳出率
对比分析
访问明细
数据展示 |
---|
来源分类
搜索引擎
搜索词
最近7日的访客搜索记录
来源升降榜
提供开通统计后任意两日的TOP10000搜索词、来路域名引入流量的对比情况,并按照变化的剧烈程度提供排行榜。 用户可通过此功能快速找到哪些来路对网站流量的影响比较大,从而及时排查相应来路问题。
数据展示 |
---|
受访域名
提供访客对网站中各个域名的访问情况。 一般情况下,网站不同域名提供的产品、内容各有差异,通过此功能用户可以了解不同内容的受欢迎程度以及网站运营成效。
受访页面
提供访客对网站中各个页面的访问情况。 站内入口页面为访客进入网站时浏览的第一个页面,如果入口页面的跳出率较高则需要关注并优化;站内出口页面为访客访问网站的最后一个页面,对于离开率较高的页面需要关注并优化。
受访升降榜
提供开通统计后任意两日的TOP10000受访页面的浏览情况对比,并按照变化的剧烈程度提供排行榜。 可通过此功能验证经过改版的页面是否有流量提升或哪些页面有巨大流量波动,从而及时排查相应问题。
热点图
记录访客在页面上的鼠标点击行为,通过颜色区分不同区域的点击热度;支持将一组页面设置为"关注范围",并可按来路细分点击热度。 通过访客在页面上的点击量统计,可以了解页面设计是否合理、广告位的安排能否获取更多佣金等。
列表图 |
---|
地区运营商
提供各地区访客、各网络运营商访客的访问情况分布。 地方网站、下载站等与地域性、网络链路等结合较为紧密的网站,可以参考此功能数据,合理优化推广运营方案。
终端详情
提供网站访客所使用的浏览终端的配置情况。 参考此数据进行网页设计、开发,可更好地提高网站兼容性,以达到良好的用户交互体验。
新老访客
当日访客中,历史上第一次访问该网站的访客记为当日新访客;历史上已经访问过该网站的访客记为老访客。 新访客与老访客进入网站的途径和浏览行为往往存在差异。该功能可以辅助分析不同访客的行为习惯,针对不同访客优化网站,例如为制作新手导航提供数据支持等。
数据展示 |
---|
对于指标业务含义的解读是关键!
select count(*) from itcast_dw.fact_weblog_detail where dt ="20191101" and valid = "true"; --排除静态资源
select count(distinct guid) as uvs from itcast_dw.fact_weblog_detail where dt ="20191101";
select count( session) as vvs from itcast_ods.click_stream_visit where dt ="20191101";
select count(distinct remote_addr) as ips from itcast_dw.fact_weblog_detail where dt ="20191101";
保存数据
--创建ads层基础指标表 drop table if exists itcast_ads.webflow_basic_info; create table itcast_ads.webflow_basic_info( month string, day string, pv bigint, uv bigint , ip bigint, vv bigint ) partitioned by(dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); -- Spark 2.x版本中默认不支持笛卡尔积操作,开启笛卡尔积 set spark.sql.crossJoin.enabled=true; --合并指标插入指标表 insert into table itcast_ads.webflow_basic_info partition(dt="20191101") select '201911','01',a.*,b.* from ( select count(*) as pv, count(distinct guid) as uv, count(distinct remote_addr) as ips from itcast_dw.fact_weblog_detail where dt ='20191101' ) a join ( select count(session) as vvs from itcast_ods.click_stream_visit where dt ="20191101" ) b;
多维统计分析
按时间维度
--计算该处理批次(一天)中的各小时pvs drop table if exists itcast_ads.pvs_everyhour_oneday; create table itcast_ads.pvs_everyhour_oneday ( month string, day string, hour string, pvs bigint ) partitioned by(dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); --保存数据 insert into table itcast_ads.pvs_everyhour_oneday partition(dt='20191101') select month , day , hour , count(*) as pvs from itcast_dw.fact_weblog_detail a where dt='20191101' group by month,day,hour; --验证数据 select * from itcast_ads.pvs_everyhour_oneday; --计算每天的pvs drop table if exists itcast_ads.pvs_everyday; create table itcast_ads.pvs_everyday( pvs bigint, month string, day string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); --保存数据 insert into table itcast_ads.pvs_everyday select count(*) as pvs, month, day from itcast_dw.fact_weblog_detail group by month,day; --验证数据 select * from itcast_ads.pvs_everyday ;
按referer、时间维度
统计每小时各来访url产生的pv量
drop table if exists itcast_ads.pvs_referer_everyhour; create table itcast_ads.pvs_referer_everyhour( referer_url string, referer_host string, month string, day string, hour string, pv_referer_cnt bigint) partitioned by(dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); insert into table itcast_ads.pvs_referer_everyhour partition(dt='20191101') select http_referer, ref_host, month, day, hour, count(*) as pv_referer_cnt from itcast_dw.fact_weblog_detail group by http_referer,ref_host,month,day,hour having ref_host is not null order by hour asc,day asc,month asc,pv_referer_cnt desc; --验证数据 select * from itcast_ads.pvs_referer_everyhour limit 5;
--统计每小时各来访host的产生的pv数并排序 drop table if exists itcast_ads.pvs_refererhost_everyhour; create table itcast_ads.pvs_refererhost_everyhour( ref_host string, month string, day string, hour string, ref_host_cnts bigint ) partitioned by(dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); insert into table itcast_ads.pvs_refererhost_everyhour partition(dt='20191101') select ref_host, month, day, hour, count(*) as ref_host_cnts from itcast_dw.fact_weblog_detail group by ref_host,month,day,hour having ref_host is not null order by hour asc,day asc,month asc,ref_host_cnts desc; --验证数据 select * from itcast_ads.pvs_refererhost_everyhour limit 5;
结果展示 |
---|
今日所有来访者平均请求浏览的页面数。该指标可以说明网站对用户的粘性。
计算方式:总页面请求数pv/独立访客数uv
drop table if exists itcast_ads.avgpv_user_everyday; create table itcast_ads.avgpv_user_everyday( day string, avgpv string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); insert into table itcast_ads.avgpv_user_everyday select '20191101', sum(b.pvs)/count(b.guid) --pv/uv from ( select guid, count(1) as pvs from itcast_dw.fact_weblog_detail where dt='20191101' group by guid ) b; --验证数据 select * from itcast_ads.avgpv_user_everyday;
平均每个独立访客一天内访问网站的次数(产生的session个数)。
计算方式:访问次数vv/独立访客数uv
select count(session)/ count(distinct guid) from itcast_ods.click_stream_visit where dt ="20191101"; --符合逻辑
平均每次访问(会话)在网站上的停留时间。体现网站对访客的吸引程度。
平均访问时长=访问总时长/访问次数。
补充sql.
跳出率是指用户到达你的网站上并在你的网站上仅浏览了一个页面就离开的访问次数与所有访问次数的百分比。是评价网站性能的重要指标。
需求描述:统计每小时各来访host的产生的pvs数最多的前N个(topN)。
1 对每个小时内的来访host次数倒序排序标号
select
ref_host,
ref_host_cnts,
concat(month,day,hour),
row_number() over (partition by concat(month,day,hour)
order by ref_host_cnts desc) as od
from itcast_ads.pvs_refererhost_everyhour;
效果如下: |
---|
2 根据上述row_number的功能,过滤出各小时的ref_host访问次数topn
drop table if exists itcast_ads.pvs_refhost_topn_everyhour; create table itcast_ads.pvs_refhost_topn_everyhour( hour string, toporder string, ref_host string, ref_host_cnts string) partitioned by (dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); insert into table itcast_ads.pvs_refhost_topn_everyhour partition(dt='20191101') select t.hour, t.od, t.ref_host, t.ref_host_cnts from ( select ref_host, ref_host_cnts, concat(month,day,hour) as hour, row_number() over (partition by concat(month,day,hour) order by ref_host_cnts desc) as od from itcast_ads.pvs_refererhost_everyhour ) t where od<=3; --验证数据 select * from itcast_ads.pvs_refhost_topn_everyhour limit 10;
结果如下: |
---|
主要是针对数据中的request进行统计分析,比如各页面PV ,各页面UV 等。
以上指标无非就是根据页面的字段group by。例如:
--统计各页面pv
select
request as request,
count(request) as request_counts
from itcast_dw.fact_weblog_detail where request is not null
group by request
order by request_counts desc
limit 20;
--统计每日最热门的页面top10 drop table if exists itcast_ads.hotpages_everyday; create table itcast_ads.hotpages_everyday( day string, url string, pvs string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); insert into table itcast_ads.hotpages_everyday select '20191101', a.request, a.request_counts from ( select request, count(request) as request_counts from itcast_dw.fact_weblog_detail where dt='20191101' group by request having request is not null ) a order by a.request_counts desc limit 10; --验证数据 select * from itcast_ads.hotpages_everyday limit 5;
需求描述:按照时间维度比如小时来统计独立访客及其产生的pv。
--时间维度:时 drop table if exists itcast_ads.user_dstc_ip_h; create table itcast_ads.user_dstc_ip_h ( guid string, pvs bigint, hour string) partitioned by (dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); insert into table itcast_ads.user_dstc_ip_h partition(dt='20191101') select guid, count(1) as pvs, concat(month,day,hour) as hour from itcast_dw.fact_weblog_detail Where dt='20191101' group by concat(month,day,hour),guid; --验证数据 select * from itcast_ads.user_dstc_ip_h where dt='20191101' limit 5; --在此结果表之上,可以进一步统计,如每小时独立访客总数: select count(1) as dstc_ip_cnts,hour from itcast_ads.user_dstc_ip_h where dt='20191101' group by hour limit 5;
--时间维度:日
select
guid,
count(1) as counts,
concat(month,day) as day from
itcast_dw.fact_weblog_detail Where dt='20191101'
group by concat(month,day),guid limit 5;
--时间维度:月
select
guid,
count(1) as counts,
month from
itcast_dw.fact_weblog_detail
group by month,guid limit 5;
需求:将每天的新访客统计出来。
实现思路:创建一个去重访客累积表,然后将每日访客对比累积表。
思路分析 |
---|
--历日去重访客累积表 drop table if exists itcast_dw.user_dsct_history; create table itcast_dw.user_dsct_history ( day string, guid string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); --每日新访客表 drop table if exists itcast_dw.user_new_d; create table itcast_dw.user_new_d ( day string, guid string) partitioned by(dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); --每日新用户插入新访客表 insert into table itcast_dw.user_new_d partition(dt='20191101') select tmp.day as day, tmp.guid as new_ip from ( select today.day as day, today.guid as guid, old.guid as old_guid from ( select distinct guid as guid, "20191101" as day from itcast_dw.fact_weblog_detail where dt="20191101" ) today left outer join itcast_dw.user_dsct_history old on today.guid=old.guid ) tmp where tmp.old_guid is null; --每日新用户追加到累计表 insert into table itcast_dw.user_dsct_history select day, guid from itcast_dw.user_new_d where dt='20191101';
验证查看
select
count(distinct guid)
from itcast_dw.fact_weblog_detail ;
select
count(1)
from itcast_dw.user_dsct_history
;
select
count(1)
from itcast_dw.user_new_d
where dt='20191101';
需求:查询今日所有回头访客及其访问次数。
查询今日所有回头访客及其访问次数 |
---|
实现思路:上表中出现次数>1的访客,即回头访客;反之,则为单次访客。
drop table if exists itcast_ads.user_returning; create table itcast_ads.user_returning( day string, guid string, acc_cnt string ) partitioned by (dt string) STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY'); insert overwrite table itcast_ads.user_returning partition(dt='20191101') select tmp.day, tmp.guid, tmp.acc_cnt from ( select '20191101' as day, guid, count(session) as acc_cnt from itcast_ods.click_stream_visit group by guid ) tmp where tmp.acc_cnt>1;
需求:统计出每天所有用户访问网站的平均次数(visit)
实现思路:总visit数/去重总用户数
drop table if exists itcast_ads.per_vsits_day;
create table itcast_ads.per_vsits_day(
day string,
per_visit double
)
STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
insert into table itcast_ads.per_vsits_day
select
'20191101',
count(session)/count(distinct guid)
from itcast_ods.click_stream_visit
where dt='20191101';
指标的统计分析上过上面步骤之后都已经计算完成存储在hive表中,为了数据的可视化需要把指标结果数据从hive中导出到mysq中以便对接可视化系统。
常见数据导出工具sqoop,kettle等。
本项目使用kettle完成数据导出。
具体步骤
确定hive表中字段
准备mysql表接收数据
配置kettle导出数据组件
执行导出任务
应用场景
使用限制
实现逻辑
导出itcast_ads.ads_trade_orde表数据到mysql
手动mysql创建目标表
create database itcast_ads_shop; use itcast_ads_shop; DROP TABLE IF EXISTS `ads_trade_order`; CREATE TABLE `ads_trade_order` ( `area_type` varchar(50) DEFAULT NULL, `region_name` varchar(50) DEFAULT NULL, `city_name` varchar(50) DEFAULT NULL, `category_type` varchar(50) DEFAULT NULL, `category_1st_name` varchar(50) DEFAULT NULL, `category_2d_name` varchar(50) DEFAULT NULL, `payment_name` varchar(50) DEFAULT NULL, `total_count` bigint(20) DEFAULT NULL, `total_goods_num` bigint(20) DEFAULT NULL, `total_money` double(20,4) DEFAULT NULL, `dt` varchar(20) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
配置kettle转换任务
注意:为防止中文乱码表输出连接指定选项! characterEncoding=utf8 |
mysql验证数据 |
应用场景
使用技术
实现逻辑
具体步骤
可以在表输入sql语句中指定分区过滤数据导出到mysql
kettle转换任务与全量导出一致只需修改kettle表输入组件的sql语句添加分区条件。
分区过滤条件 |
---|
添加命名参数 |
执行kettle转换任务并验证mysql数据 |
工作流(Workflow)
工作流解决问题
一个完整的大数据分析系统通常都是由多个前后依赖的模块组合构成的
模块单元、模块内部往往存在时间上的先后依赖关系,且存在着周期性重复执行的工作。为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行。
crontab
使用linux的crontab来定义调度,但是缺点比较明显,无法设置依赖复杂任务调度。且需要编写相关shell脚本。
当下企业两种选择,
知名度比较高的是Apache Oozie,但是其配置工作流的过程是编写大量的XML配置,而且代码复杂度比较高,不易于二次开发。
Azkaban是由领英公司推出的一个批量工作流任务调度器,用于在一个工作流内以一个特定的顺序运行一组工作和流程。Azkaban使用job配置文件建立任务之间的依赖关系,并提供一个易于使用的web用户界面维护和跟踪工作流。
azkaban界面 |
---|
业务数据同步
mysql–>hive
shell脚本
all_import.sh
#------------------------------------- # Author: www.itcast.cn # Date: 2020-02 # Description: 全量调度脚本 #------------------------------------- #!/bin/bash #刷新下环境变量 source /etc/profile #获取当前时间 log_day=`date +%Y%m%d` #昨日数据 yesterday=`date +"%Y%m%d" -d '-1 days'` #1天前 #定义一些业务参数,数据路径,表名等 #kettle_home kettle_home=/export/servers/data-integration #kettle转换文件路径 ktr_path=/export/servers/scheduler/data_collection/ktr #日志文件路径 log_path=/var/log/scheduler #判断是否有参数传入 if [ $1 ] then run_data_date=$1 else run_data_date=${yesterday} fi echo ${run_data_date} #全量导出,mysql-->hive pan.sh -file #清空kettle 安装目录下system/karaf/caches,否则报错 no sutable driver。。。 rm -rf ${kettle_home}/system/karaf/caches #执行kettle转换任务 #开始数据同步任务 echo "--------------开始数据全量同步任务 $(date +'%Y-%m-%d %H:%m:%S')" >>${log_path}/${log_day}.log 2>&1 ${kettle_home}/kitchen.sh -file ${ktr_path}/1-全量抽取作业.kjb -param:dt=${run_data_date} >>${log_path}/${log_day}.log 2>&1
incr_import.sh
#------------------------------------- # Author: www.itcast.cn # Date: 2020-02 # Description: 增量同步脚本 #------------------------------------- #!/bin/bash #刷新下环境变量 source /etc/profile #获取当前时间 log_day=`date +%Y%m%d` #昨日数据 yesterday=`date +"%Y%m%d" -d '-1 days'` #1天前 #定义一些业务参数,数据路径,表名等 #kettle_home kettle_home=/export/servers/data-integration #kettle转换文件路径 ktr_path=/export/servers/scheduler/data_collection/ktr #日志文件路径 log_path=/var/log/scheduler #判断是否有参数传入 if [ $1 ] then run_data_date=$1 else run_data_date=${yesterday} fi echo ${run_data_date} #全量导出,mysql-->hive pan.sh -file #清空kettle 安装目录下system/karaf/caches,否则报错 no sutable driver。。。 rm -rf ${kettle_home}/system/karaf/caches #执行kettle转换任务 #开始数据同步任务 echo "--------------开始数据增量同步任务 $(date +'%Y-%m-%d %H:%m:%S')">>${log_path}/${log_day}.log 2>&1 ${kettle_home}/kitchen.sh -file ${ktr_path}/2-增量抽取作业.kjb -param:dt=${run_data_date} >>${log_path}/${log_day}.log 2>&1
点击流数据同步
flume作为流式日志采集工具会一直运行。
flume 启动脚本
#!/bin/bash #日志文件路径 log_path=/var/log/scheduler case $1 in "start"){ echo " --------启动采集flume-------$(date +'%Y-%m-%d %H:%m:%S')" >>${log_path}/flume.log 2>&1 nohup /export/servers/apache-flume-1.8.0-bin/bin/flume-ng agent --conf-file /export/servers/apache-flume-1.8.0-bin/job/interceptor2hdfs.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >>${log_path}/flume.log 2>&1 & };; "stop"){ echo "--------停止采集flume-------$(date +'%Y-%m-%d %H:%m:%S')" >>${log_path}/flume.log 2>&1 ps -ef | grep interceptor2hdfs | grep -v grep |awk '{print $2}' | xargs kill };; esac
点击流日志
数据预处理模块按照数据处理过程和业务需求,可以分成3个步骤执行:
数据预处理清洗
点击流模型之pageviews
点击流模型之visit
具体步骤
预处理spark程序 打成jar包
preprocess.sh
/export/servers/spark-2.2.0-bin-hadoop2.6/bin/spark-submit \
--class com.itheima.main.ETLApp \
--master spark://node1:7077 \
--executor-memory 1G \
--total-executor-cores 5 \
click_log_etl_16-1.0-SNAPSHOT.jar \
excute.sh
#------------------------------------- # Author: www.itcast.cn # Date: 2020-02 # Description: 业务数据分析调度脚本 #------------------------------------- #!/bin/bash #刷新下环境变量 source /etc/profile #设置环境变量 export SPARK_HOME=/export/servers/spark-2.2.0-bin-hadoop2.6 export KETTLE_HOME=/export/servers/data-integration #获取当前时间 log_day=`date +%Y%m%d` #昨日数据 yesterday=`date +"%Y%m%d" -d '-1 days'` #1天前 #判断是否有参数传入 if [ $1 ] then PARAM_DT=$1 else PARAM_DT=${yesterday} fi #最好打印到日志中 echo ${PARAM_DT} #设置SPARK提交位置 SPARK_MASTER="--master spark://node1:7077" #设置SPARK SHUFFLE分区数量 SPARK_SQL_SHUFFLE_PARTITIONS="set spark.sql.shuffle.partitions=1;" #/// # # ETL SQL语句变量 # #/// # 1. 加载店铺维度表数据 SQL_ETL_LOAD_DIMSHOP="insert overwrite table itcast_dw.dim_shops partition(dt='${PARAM_DT}') select t1.shopid as shop_id, -- 店铺id t1.shopname as shop_name, -- 店铺名称 t2.orgid as city_id, -- 城市组织机构id t2.orgname as city_name, -- 城市组织机构名称 t3.orgid as region_id, -- 区域组织机构id t3.orgname as region_name -- 区域组织机构名称 from (select shopid, shopname, areaid from itcast_ods.itcast_shops where dt='${PARAM_DT}') t1 -- 店铺数据 left join (select orgid, parentid, orgname, orglevel from itcast_ods.itcast_org where orglevel=2 and dt='${PARAM_DT}') t2 -- 城市组织机构数据 on t1.areaid = t2.orgid left join (select orgid, parentid, orgname, orglevel from itcast_ods.itcast_org where orglevel=1 and dt='${PARAM_DT}') t3 -- 省份组织机构数据 on t2.parentid = t3.orgid;" # 2. 加载商品分类维度数据 SQL_ETL_LOAD_DIM_GOODS_CAT="insert overwrite table itcast_dw.dim_goods_cat partition(dt='${PARAM_DT}') select t3.catid as cat_3d_id, -- 三级分类id t3.catname as cat_3d_name, -- 三级分类名称 t2.catid as cat_2d_id, -- 二级分类id t2.catname as cat_2d_name, -- 二级分类名称 t1.catid as cat_1t_id, -- 一级分类id t1.catname as cat_1t_name -- 一级分类名称 from (select catid, catname, parentid from itcast_ods.itcast_goods_cats where cat_level=3 and dt='${PARAM_DT}') t3 -- 商品三级分类数据 left join (select catid, catname, parentid from itcast_ods.itcast_goods_cats where cat_level=2 and dt='${PARAM_DT}') t2 -- 商品二级分类数据 on t3.parentid = t2.catid left join (select catid, catname, parentid from itcast_ods.itcast_goods_cats where cat_level=1 and dt='${PARAM_DT}') t1 -- 商品一级分类数据 on t2.parentid = t1.catid;" # 3. 加载支付维度数据 SQL_ETL_LOAD_DIM_PAYMENT="insert overwrite table itcast_dw.dim_payment partition(dt='${PARAM_DT}') select t1.id as payment_id, -- 支付方式id t1.payName as payment_name -- 支付方式名称 from (select id, payName from itcast_ods.itcast_payments where dt='${PARAM_DT}') t1;" # 4. 加载订单明细事实表 SQL_ETL_LOAD_FACT_ORDER_GOODS="insert overwrite table itcast_dw.fact_order_goods_wide partition(dt='${PARAM_DT}') select t1.orderid as order_id, t3.goodscatid as goods_cat_3d_id, t3.shopid as shop_id, t1.paytype as payment_id, t2.goodsnum as goods_num, t2.goodsnum*t2.payprice as pay_money, t1.paytime as paytime from (select orderid, paytype, paytime from itcast_dw.fact_orders where dt='${PARAM_DT}') t1 -- 订单表数据 left join (select orderid, goodsid, goodsnum, payprice from itcast_ods.itcast_order_goods where dt='${PARAM_DT}') t2 -- 订单明细数 on t1.orderid = t2.orderid left join (select goodsid, shopid, goodscatid from itcast_dw.dim_goods where dw_end_date = '9999-12-31') t3 -- 商品数量 on t2.goodsid = t3.goodsid;" #/// # # 执行Spark SQL # #/// # 加载DIM_SHOP维度表数据 $SPARK_HOME/bin/spark-sql $SPARK_MASTER -e "${SPARK_SQL_SHUFFLE_PARTITIONS};${SQL_ETL_LOAD_DIMSHOP}" # 加载DIM_GOODS_CAT维度表数据 $SPARK_HOME/bin/spark-sql $SPARK_MASTER -e "${SPARK_SQL_SHUFFLE_PARTITIONS};${SQL_ETL_LOAD_DIM_GOODS_CAT}" # 加载DIM_PAYMENT维度表数据 $SPARK_HOME/bin/spark-sql $SPARK_MASTER -e "${SPARK_SQL_SHUFFLE_PARTITIONS};${SQL_ETL_LOAD_DIM_PAYMENT}" # 加载FACT_ORDER_GOODS订单明细数据 $SPARK_HOME/bin/spark-sql $SPARK_MASTER -e "${SPARK_SQL_SHUFFLE_PARTITIONS};${SQL_ETL_LOAD_FACT_ORDER_GOODS}"
execute_quota.sh
#------------------------------------- # Author: www.itcast.cn # Date: 2020-02 # Description: 业务数据分析调度脚本 #------------------------------------- #!/bin/bash #刷新下环境变量 source /etc/profile #设置环境变量 export SPARK_HOME=/export/servers/spark-2.2.0-bin-hadoop2.6 #获取当前时间 log_day=`date +%Y%m%d` #昨日数据 yesterday=`date +"%Y%m%d" -d '-1 days'` #1天前 #判断是否有参数传入 if [ $1 ] then PARAM_DT=$1 else PARAM_DT=${yesterday} fi #最好打印到日志中 echo ${PARAM_DT} #设置SPARK提交位置 SPARK_MASTER="--master spark://node1:7077" #设置SPARK SHUFFLE分区数量 SPARK_SQL_SHUFFLE_PARTITIONS="set spark.sql.shuffle.partitions=1;" #/// # # ETL SQL语句变量 # #/// # 1. 全国、无商品分类不分支付类型维度分析 SQL_QUOTA1="insert overwrite table itcast_ads.ads_trade_order partition(dt='20190909') select '全国' as area_type, '' as region_name, '' as city_name, '' as category_type, '' as category_1st_name, '' as category_2d_name, '所有' as payment_name, count(distinct t1.order_id) as total_count,--订单笔数需要去重 sum(t1.goods_num) as total_goods_num,--总的商品数量 sum(t1.pay_money ) as total_money from (select * from itcast_dw.fact_order_goods_wide where dt='20190909') t1; select * from itcast_ads.ads_trade_order t where t.area_type = '全国'; -- 全国、无商品分类分支付类型维度分析 -- 注意:要使用 insert into,否则数据会被覆写 insert into table itcast_ads.ads_trade_order partition(dt='20190909') select '全国' as area_type, '' as region_name, '' as city_name, '' as category_type, '' as category_1st_name, '' as category_2d_name, t1.payment_name as payment_name, count(distinct t2.order_id) as total_count, sum(t2.goods_num) as total_goods_num, sum(t2.pay_money) as total_money from (select * from itcast_dw.fact_order_goods_wide where dt='20190909') t2 left join (select * from itcast_dw.dim_payment where dt='20190909') t1 on t1.payment_id = t2.payment_id group by t1.payment_name;" # 全国、一级商品分类维度交易信息 SQL_QUOTA2="insert into table itcast_ads.ads_trade_order partition(dt='20190909') select '全国' as area_type, '' as region_name, '' as city_name, '一级分类' as category_type, t1.cat_1t_name as category_1st_name, '' as category_2d_name, '所有' as payment_name, count(distinct t2.order_id) as total_count, sum(case when t2.goods_num is null then 0 else t2.goods_num end ) as total_goods_num, sum(case when t2.pay_money is null then 0 else t2.pay_money end ) as total_money from (select * from itcast_dw.dim_goods_cat where dt='20190909') t1 left join (select * from itcast_dw.fact_order_goods_wide where dt='20190909') t2 on t1.cat_3d_id = t2.goods_cat_3d_id group by t1.cat_1t_name; insert into table itcast_ads.ads_trade_order partition(dt='20190909') select '全国' as area_type, '' as region_name, '' as city_name, '一级分类' as category_type, t3.cat_1t_name as category_1st_name, '' as category_2d_name, t1.payment_name as payment_name, count(distinct t2.order_id) as total_count, sum(t2.goods_num) as total_goods_num, sum(t2.pay_money) as total_money from (select * from itcast_dw.dim_payment where dt='20190909') t1 left join (select * from itcast_dw.fact_order_goods_wide where dt='20190909') t2 on t1.payment_id = t2.payment_id left join (select * from itcast_dw.dim_goods_cat where dt='20190909') t3 on t2.goods_cat_3d_id = t3.cat_3d_id group by t1.payment_name, t3.cat_1t_name;" #/// # # 执行Spark SQL # #/// # 全国、无商品分类维度的交易信息 $SPARK_HOME/bin/spark-sql $SPARK_MASTER -e "${SPARK_SQL_SHUFFLE_PARTITIONS};${SQL_QUOTA1}" # 全国、一级商品分类维度交易信息 $SPARK_HOME/bin/spark-sql $SPARK_MASTER -e "${SPARK_SQL_SHUFFLE_PARTITIONS};${SQL_QUOTA2}"
借助kettle导出hive指标数据到mysql,编写shell脚本
export.sh
#------------------------------------- # Author: www.itcast.cn # Date: 2020-02 # Description: 指标调度脚本 #------------------------------------- #!/bin/bash #刷新下环境变量 source /etc/profile #获取当前时间 log_day=`date +%Y%m%d` #昨日数据 yesterday=`date +"%Y%m%d" -d '-1 days'` #1天前 #定义一些业务参数,数据路径,表名等 #kettle_home kettle_home=/export/servers/data-integration #kettle转换文件路径 ktr_path=/export/servers/scheduler/export/ktr #日志文件路径 log_path=/var/log/scheduler #判断是否有参数传入 if [ $1 ] then run_data_date=$1 else run_data_date=${yesterday} fi echo ${run_data_date} #全量导出,hive-->mysql #清空kettle 安装目录下system/karaf/caches,否则报错 no sutable driver。。。 rm -rf ${kettle_home}/system/karaf/caches #执行kettle转换任务 #开始数据同步任务 echo "--------------开始数据导出任务 $(date +'%Y-%m-%d %H:%m:%S')" >>${log_path}/${log_day}.log 2>&1 ${kettle_home}/pan.sh -file ${ktr_path}/3-指标数据导出.ktr -param:dt=${run_data_date} >>${log_path}/${log_day}.log 2>&1
数据同步
type=command
command=sh /export/servers/scheduler/data_collection/incr_import.sh
数据预处理
type=command
dependencies=incr_import
command=sh /export/servers/scheduler/execute/execute.sh
数据ETL计算
type=command
dependencies=execute
command=sh /export/servers/scheduler/execute/execute_quota.sh
指标导出
type=command
dependencies=execute_quota
command=sh /export/servers/scheduler/export/export.sh
ods层表:
三张表:
weblog_origin:原始数据表,清洗和格式转换,标上逻辑删除,
pageview模型:打上sessionid,步长,停留时长信息
visit模型:pageview基础上进行聚合按照sessionid分组,找到每个会话的开始,结束时间等;
三张表都是parquet格式存储使用snappy压缩。
上面两个模型业务模型。
有个字段数据量比较多,request请求为例包含
host,query,id,name…
拉宽:最终得到一张大宽表,包含所有的事实字段和维度字段。
拉宽两个字段:
时间字段:年月日时分秒–》年,月,日,小时,
substring(),date_fromat
request字段:host,query,id,
parse_url_tuple:可以成组的解析url数据。
注意点:
spark 2.2.2版本,sql是没有parse_url_tuple,
par_url:只能单个解析,
最后进入itcast_dw层的fact_weblog_detail表。
基础指标:pv,uv,vv:会话次数,ip
复合指标:平均频度,次数,vv
流量指标:pv,uv,vv,ip访问数;
宽表有时间字段,可以直接在单表中group by搞定。
借助时间,日期维度表来进行时间维度分析。
单表中维度分析group by维度字段即可;
人均浏览数:总pv/访客数
平均访问频度:vv/uv
平均访问时长
跳出率
每个:分组;
最多的多少个:
组内排序之后的前几名。
topn问题
row_number() over(partition by xx order by xx) r:
按照某某分组按照某字段组内排序,序号是连续不会并列
rank() over():会有并列的问题,跳过重复
desen rank() over:会有并列问题,不跳过重复。
子查询也是使用非常方便,就是我们多次查询前面的查询结果后续查询会使用。
先计算每天的独立访客(新老顾客),
维护一张历史用户表
使用表的join可以判断数据是否在两个表中重复出现,
left join :判断null值的机会;
最后维护历史用户表:每天新增用户插入历史用户表。
基于visit表统计访问次数(session会话次数)大于1则称为回头访客。
指标计算完成之后指标存储在hive中,对接可视化需要把数据导出到mysql中,
sqoop,datax,kettle
使用kettle导出指标数据(hive)其它节点上的mysql表中
开发kettle的任务
输入组件:表输入
输出组件:表输出
进行数据数据的增量和全量导出可以使用sql语句灵活控制!!
离线工作强调的工作流的调度,因为离线分析T+1会有很多重复工作同时这些工作之间还有顺序要求,
数据预处理之后才能进行数据etl和数据指标。
引入调度:
crotab:自带的定时调度工具,轻量级同时缺陷也很明显,无法设置任务之间的依赖关系,
azkaban,oozie,airflow开源的调度系统,
我们使用azkaban进行调度。
数据采集:flume实时采集无需调度;
kettle采集的数据,每天需要批量采集数据需要调度
数据预处理:日志清洗+业务模型,需要调度
数据的etl:业务数据,日志数据都需要调度,
数据指标计算:业务数据,日志指标都需要调度
数据导出:需要调度。
azkaban部署情况:
node1:web服务器,
node2,node3 executserver节点。
1 编写job文件,type=command command=sh ,spark ,hadoop
2 打成zip包上传到azkaban新建的project
3 执行或者配置crontab 表达式
使用kettle采集数据,可以使用pan.sh或者kitchen.sh在linux中执行转化或者作业任务。
可以使用shell命令方式让azkaban进行调度。
执行kettle任务的脚本,
import.sh
如果使用shell脚本方式让azkaban调度,注意保证每个executor上都能找到该脚本文件。
测试课件上shell脚本还有azkabanjob文件,你需要把所有的脚本文件按照课件脚本中的目录放置到每个executorser的节点上。
job文件一定要是utf8无bom.
重点:
1 梳理数据倾斜的业务和实现思路,代码不要求。
2 工作流调度,shell脚本做一个要求可以不写但是必须能看懂。
间字段,可以直接在单表中group by搞定。
借助时间,日期维度表来进行时间维度分析。
单表中维度分析group by维度字段即可;
人均浏览数:总pv/访客数
平均访问频度:vv/uv
平均访问时长
跳出率
每个:分组;
最多的多少个:
组内排序之后的前几名。
topn问题
row_number() over(partition by xx order by xx) r:
按照某某分组按照某字段组内排序,序号是连续不会并列
rank() over():会有并列的问题,跳过重复
desen rank() over:会有并列问题,不跳过重复。
子查询也是使用非常方便,就是我们多次查询前面的查询结果后续查询会使用。
先计算每天的独立访客(新老顾客),
维护一张历史用户表
使用表的join可以判断数据是否在两个表中重复出现,
left join :判断null值的机会;
最后维护历史用户表:每天新增用户插入历史用户表。
基于visit表统计访问次数(session会话次数)大于1则称为回头访客。
指标计算完成之后指标存储在hive中,对接可视化需要把数据导出到mysql中,
sqoop,datax,kettle
使用kettle导出指标数据(hive)其它节点上的mysql表中
开发kettle的任务
输入组件:表输入
输出组件:表输出
进行数据数据的增量和全量导出可以使用sql语句灵活控制!!
离线工作强调的工作流的调度,因为离线分析T+1会有很多重复工作同时这些工作之间还有顺序要求,
数据预处理之后才能进行数据etl和数据指标。
引入调度:
crotab:自带的定时调度工具,轻量级同时缺陷也很明显,无法设置任务之间的依赖关系,
azkaban,oozie,airflow开源的调度系统,
我们使用azkaban进行调度。
数据采集:flume实时采集无需调度;
kettle采集的数据,每天需要批量采集数据需要调度
数据预处理:日志清洗+业务模型,需要调度
数据的etl:业务数据,日志数据都需要调度,
数据指标计算:业务数据,日志指标都需要调度
数据导出:需要调度。
azkaban部署情况:
node1:web服务器,
node2,node3 executserver节点。
1 编写job文件,type=command command=sh ,spark ,hadoop
2 打成zip包上传到azkaban新建的project
3 执行或者配置crontab 表达式
使用kettle采集数据,可以使用pan.sh或者kitchen.sh在linux中执行转化或者作业任务。
可以使用shell命令方式让azkaban进行调度。
执行kettle任务的脚本,
import.sh
如果使用shell脚本方式让azkaban调度,注意保证每个executor上都能找到该脚本文件。
测试课件上shell脚本还有azkabanjob文件,你需要把所有的脚本文件按照课件脚本中的目录放置到每个executorser的节点上。
job文件一定要是utf8无bom.
重点:
1 梳理数据倾斜的业务和实现思路,代码不要求。
2 工作流调度,shell脚本做一个要求可以不写但是必须能看懂。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。