赞
踩
大概思路:将大数据系统构建为多个层次,三层架构:批处理层、实时处理层、服务层
https://blog.csdn.net/u013368491/article/details/71271864
如何确认集群规模(主要根据数据量)
按每条日志1K,每天1亿条,半年内不扩容服务器来算:100000000 / 1024 / 1024 = 约100G,保存半年约18T,保存3副本,共54T左右,再预留20%Buf,每台服务器8T硬盘预估,共需约9台服务器
如何选择Apache/CDH/HDP版本?
让同学了解在企业中如何做技术选型及大概思路
1、Apache:运维麻烦,组件间兼容性需要自己调研。(一般大厂使用,技术实力雄厚,有专业的运维人员)
2、CDH:国内使用最多的版本,但CM不开源,但其实对中、小公司使用来说没有影响(建议使用)
3、HDP:开源,可以进行二次开发,但是没有CDH稳定
大数据服务组个规范,示例如下(因格式问题,暂未导入)TODO
服务器使用物理机还是云主机
成本考虑:
1、物理机:以128G内存,20核物理CPU,40线程,8THDD和2TSSD硬盘,单台报价4W出头,需考虑托管服务器费用。一般物理机寿命5年左右
2、云主机,以阿里云为例,差不多相同配置,每年5W
运维成本考虑:
1、物理机:需要有专业的运维人员
2、云主机:很多运维工作都由阿里云已经完成,运维相对较轻松
尽量使用离线方式安装
使用非root用户,配置免密码的sudo权限
确认HDFS的存储目录,保证存储在空间最大硬盘上
元数据备份(重点,如数据损坏,可能整个集群无法运行,至少要保证每日零点之后备份到其它服务器两个复本)
配置机架感知
基准测试
参数调优
1. dfs.namenode.handler.count=20 * log2(Cluster Size),比如集群规模为20台时,此参数设置为80
The Hadoop RPC server consists of a single RPC queue per port and multiple handler (worker) threads that dequeue and process requests. If the number of handlers is insufficient, then the RPC queue starts building up and eventually overflows. You may start seeing task failures and eventually job failures and unhappy users.
It is recommended that the RPC handler count be set to 20 * log2(Cluster Size) with an upper limit of 200.
2. dfs.namenode.service.handler.count=上面参数的一半
There is no precise calculation for the Service RPC handler count however the default value of 10 is too low for most production clusters. We have often seen this initialized to 50% of the dfs.namenode.handler.count in busy clusters and this value works well in practice.
3. dfs.namenode.edits.dir设置与dfs.namenode.name.dir尽量分开,达到最低写入延迟
4. dfs.namenode.accesstime.precision=0
The setting dfs.namenode.accesstime.precision controls how often the NameNode will update the last accessed time for each file. It is specified in milliseconds. If this value is too low, then the NameNode is forced to write an edit log transaction to update the file's last access time for each read request and its performance will suffer.
The default value of this setting is a reasonable 3600000 milliseconds (1 hour). We recommend going one step further and setting it to zero so last access time updates are turned off. Add the following to your hdfs-site.xml.
用户行为分析,是指在获得网站或APP等平台访问量基本数据的情况下,
对有关数据进行统计、分析,从中发现用户访问网站或APP等平台的规律,
并将这些规律与网络营销策略等相结合,从而发现目前网络营销活动中可能存在的问题,
并为进一步修正或重新制定网络营销策略提供依据。
45660 45660 M 1 0 ios huawei wifi 59.48.116.0 18701445660 1 0 0 0 1.0 startHomework 1554134400
有20台业务服务器,设计从业务服务器的log中使用Flume实时采集数据到HDFS架构,过程中要保证数据不能丢失。请设计架构,同时编写Flume agent配置
Taildir Source相比Exec Source、Spooling Directory Source的优势(自我介绍时的亮点)
1.7版本之前,实现实时采集日志的Source只有Exec Source,但此Source可能会丢失数据(见官网描述)
大家为了实现实时采集的效果,又保证数据安全,只能每隔半分钟产生一个并移动到Spooling Directory监控的目录中
1.7版本之后,出现了Taildir Source,即可以实时采集数据,又保证了数据安全,内部实现了类似断点续传的功能
batchSize大小如何设置?event1K左右时,500-1000合适
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.command = tail -F /home/hadoop/guolong.txt
a1.sources.r1.channels = c1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://atguigu:8020/guolong/%Y%m%d
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=TEXT
# 每600秒滚动一个文件
a1.sinks.k1.hdfs.rollInterval=600
# 每128M滚动一个文件
a1.sinks.k1.hdfs.rollSize=134217728
a1.sinks.k1.hdfs.rollCount=0
# 每次拉取1000个event写入HDFS
a1.sinks.k1.hdfs.batchsize=1000
a1.sinks.k1.hdfs.threadsPoolSize=16
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.filePrefix=guolong.%Y%m%d%H%M
a1.sinks.k1.hdfs.idelTimeout=600
a1.sinks.k1.hdfs.round=true
a1.sinks.k1.hdfs.roundValue=10
a1.sinks.k1.hdfs.roundUnit= minute
说明:
测试启动脚本:
bin/flume-ng agent -n a1(agent的名称) -c conf -f conf/example(配置文件名称) -Dflume.root.logger=DEBUG,console
基于以上hdfs.rollInterval=1800,hdfs.rollSize=134217728,hdfs.roundValue=10,hdfs.roundUnit= minute几个参数综合作用,效果如下:
1、tmp文件在达到128M时会滚动生成正式文件
2、tmp文件创建超10分钟时会滚动生成正式文件
举例:在2018-01-01 05:23的的时侯sink接收到数据,那会产生如下tmp文件:
/guolong/20180101/guolong.201801010520.tmp
即使文件内容没有达到128M,也会在05:33时滚动生成正式文件
uid STRING comment "用户唯一标识",
username STRING comment "用户昵称",
gender STRING comment "性别",
level TINYINT comment "1代表小学,2代表初中,3代表高中",
is_vip TINYINT comment "0代表不是会员,1代表是会员",
os STRING comment "操作系统:os,android等",
channel STRING comment "下载渠道:auto,toutiao,huawei",
net_config STRING comment "当前网络类型",
ip STRING comment "IP地址",
phone STRING comment "手机号码",
video_id INT comment "视频id",
video_length INT comment "视频时长,单位秒",
start_video_time BIGINT comment "开始看视频的时间缀,秒级",
end_video_time BIGINT comment "退出视频时的时间缀,秒级",
version STRING comment "版本",
event_key STRING comment "事件类型",
event_time BIGINT comment "事件发生时的时间缀,秒级"
1. SparkCore清洗数据,写入到/user/hive/warehouse/tmp.db/user_behavior_${day}目录
2. 建立tmp.user_behavior_${day}临时表,并加载上面清洗后的数据
3. 使用hive引擎,并用开窗函数row_number,将tmp.user_behavior_${day}表数据插入到dwd.user_behavior表中
4. 删除tmp.user_behavior_${day}临时表
inputPath:/user/hive/warehouse/ods.db/origin_user_behavior/${day}
outputPath:/user/hive/warehouse/tmp.db/user_behavior_${day}
package com.atguigu.user_behavior
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 用户行为数据清洗
* 1、验证数据格式是否正确,切分后长度必须为17
* 2、手机号脱敏,格式为123xxxx4567
* 3、去掉username中带有的\n,否则导致写入HDFS时会换行
*/
object UserBehaviorCleaner {
def main(args : Array[String]): Unit ={
if(args.length != 2){
println("Usage:please input inputPath and outputPath")
System.exit(1)
}
// 获取输入输出路径
val inputPath = args(0)
val outputPath = args(1)
val conf = new SparkConf().setAppName(getClass.getSimpleName).setMaster("local[2]")
val sc = new SparkContext(conf)
// 通过输入路径获取RDD
val eventRDD: RDD[String] = sc.textFile(inputPath)
// 清洗数据,在算子中不要写大量业务逻辑,应该将逻辑封装到方法中
eventRDD.filter(event => checkEventValid(event)) // 验证数据有效性
.map( event => maskPhone(event)) // 手机号脱敏
.map(event => repairUsername(event)) // 修复username中带有\n导致的换行
.coalesce(3)
.saveAsTextFile(outputPath)
sc.stop()
}
/**
* username为用户自定义的,里面有要能存在"\n",导致写入到HDFS时换行
* @param event
*/
def repairUsername(event : String)={
val fields = event.split("\t")
// 取出用户昵称
val username = fields(1)
// 用户昵称不为空时替换"\n"
if(username != "" && !"Null".equals(username)){
fields(1) = username.replace("\n","")
}
fields.mkString("\t")
}
/**
* 脱敏手机号
* @param event
*/
def maskPhone(event : String): String ={
var maskPhone = new StringBuilder
val fields: Array[String] = event.split("\t")
// 取出手机号
val phone = fields(9)
// 手机号不为空时做掩码处理
if(phone != null && !"".equals(phone)){
maskPhone = maskPhone.append(phone.substring(0,3)).append("xxxx").append(phone.substring(7,11))
fields(9) = maskPhone.toString()
}
fields.mkString("\t")
}
/**
* 验证数据格式是否正确,只有切分后长度为17的才算正确
* @param event
*/
def checkEventValid(event : String) ={
val fields = event.split("\t")
fields.length == 17
}
}
spark-submit --master local[2] --class com.xxx.user_behavior.UserBehaviorCleaner UserBehaviorCleaner.jar \
hdfs://xxx:8020/user/hive/warehouse/ods.db/origin_user_behavior/${day} \
hdfs://xxx:8020/user/hive/warehouse/tmp.db/user_behavior_${day}
spark-submit --master yarn --deploy-mode cluster --num-executors 8 --executor-cores 4 --executor-memory 12G \
--class com.atguigu.user_behavior.UserBehaviorCleaner UserBehaviorCleaner.jar \
hdfs://xxx:8020/user/hive/warehouse/ods.db/origin_user_behavior/${day} \
hdfs://xxx:8020/user/hive/warehouse/tmp.db/user_behavior_${day}
场景:以后工作中经常会遇到在本地执行没有问题,到了服务器跑的数据就是错误的
IDEA设置:Run --> Edit Configurations添加Remote
在提交脚本中添加–driver-java-options参数
spark-submit --master local[2] --driver-java-options "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=18888" --class com.xxx.user_behavior.UserBehaviorCleaner UserBehaviorCleaner.jar hdfs://xxx:8020/user/hive/warehouse/ods.db/origin_user_behavior/${day} hdfs://xxx:8020/user/hive/warehouse/tmp.db/user_behavior_${day}
create table if not exists tmp.user_behavior_${day}(
uid STRING comment "用户唯一标识",
username STRING comment "用户昵称",
gender STRING comment "性别",
level TINYINT comment "1代表小学,2代表初中,3代表高中",
is_vip TINYINT comment "0代表不是会员,1代表是会员",
os STRING comment "操作系统:os,android等",
channel STRING comment "下载渠道:auto,toutiao,huawei",
net_config STRING comment "当前网络类型",
ip STRING comment "IP地址",
phone STRING comment "手机号码",
video_id INT comment "视频id",
video_length INT comment "视频时长,单位秒",
start_video_time BIGINT comment "开始看视频的时间缀,秒级",
end_video_time BIGINT comment "退出视频时的时间缀,秒级",
version STRING comment "版本",
event_key STRING comment "事件类型",
event_time BIGINT comment "事件发生时的时间缀,秒级")
row format delimited fields terminated by "\t"
location "/user/hive/warehouse/tmp.db/user_behavior_${day}";
思考:
1、level和is_vip使用TINYINT,而不是使用INT?
2、分区字段dt为什么要存储int型,如20190408,而不是字符串的'2019-04-08'
说明:event_key为endVideo时,会发送start_video_time和end_video_time字段
create external table if not exists dwd.user_behavior(
uid STRING comment "用户唯一标识",
username STRING comment "用户昵称",
gender STRING comment "性别",
level TINYINT comment "1代表小学,2代表初中,3代表高中",
is_vip TINYINT comment "0代表不是会员,1代表是会员",
os STRING comment "操作系统:os,android等",
channel STRING comment "下载渠道:auto,toutiao,huawei",
net_config STRING comment "当前网络类型",
ip STRING comment "IP地址",
phone STRING comment "手机号码",
video_id INT comment "视频id",
video_length INT comment "视频时长,单位秒",
start_video_time BIGINT comment "开始看视频的时间缀,秒级",
end_video_time BIGINT comment "退出视频时的时间缀,秒级",
version STRING comment "版本",
event_key STRING comment "事件类型",
event_time BIGINT comment "事件发生时的时间缀,秒级") partitioned by(dt INT)
row format delimited fields terminated by "\t" stored as ORC
面试题:
Textfile、Parquet、ORC格式选择(面试亮点:使用ORC后,空间节省90%,查询提升3-5倍)
外部表与内部表在企业中怎么使用?
insert overwrite table dwd.user_behavior partition(dt=${day})
select
uid,
username,
gender,
level,
is_vip,
os,
channel,
net_config,
ip,
phone,
video_id,
video_length,
start_video_time,
end_video_time,
version,
event_key,
event_time
from (
select
uid,
username,
gender,
level,
is_vip,
os,
channel,
net_config,
ip,
phone,
video_id,
video_length,
start_video_time,
end_video_time,
version,
event_key,
event_time,
row_number() OVER (PARTITION BY uid,event_key,event_time ORDER BY event_time) u_rank
from tmp.user_behavior_${day}
) temp where u_rank = 1
说明:每个event_key代表一种行为
今日观看视频用户数
今日完成视频用户数
CREATE TABLE app_cource_study_report (
`id` int(11) NOT NULL AUTO_INCREMENT,
`watch_video_cnt` int(11) DEFAULT NULL,
`complete_video_cnt` int(11) DEFAULT NULL,
`dt` int(11) DEFAULT NULL,
`created_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `app_cource_study_report_dt` (`dt`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
建表时一定要指定created_at和updated_at字段,便于脚本重跑后定位问题
create table if not exists tmp.app_cource_study_analysis_${day}(
watch_video_count INT,
complete_video_count INT,
dt INT
) row format delimited fields terminated by "\t";
insert overwrite table tmp.app_cource_study_analysis_${day}
select sum(watch_video_count),sum(complete_video_count),dt from (
select count(distinct uid) as watch_video_count,0 as complete_video_count,dt from dwd.user_behavior where dt = ${day} and event_key = "startVideo" group by dt
union all
select 0 as watch_video_count,count(distinct uid) as complete_video_count,dt from dwd.user_behavior where dt = ${day} and event_key = "endVideo"
and (end_video_time - start_video_time) >= video_length group by dt) tmp group by dt
要点:
获取用户数时,需要根据用户唯一标识uid进行去重操作
插入时为什么要用overwrite而不是into
面试题:union与union all的区别
app_course_study_analysis.sh
#! /bin/bash
day=$1
# 验证输入参数的合法性
if [ ${#day} -ne 8 ];then
echo "Please input date,eg:20190402"
exit 1
fi
# 创建临时表
hive -e "
create table if not exists tmp.app_cource_study_analysis_${day}(
watch_video_count INT,
complete_video_count INT,
dt INT
) row format delimited fields terminated by '\t';"
# 向临时表插入数据
hive -e "
insert overwrite table tmp.app_cource_study_analysis_${day}
select sum(watch_video_count),sum(complete_video_count),dt from (
select count(distinct uid) as watch_video_count,0 as complete_video_count,dt from dwd.user_behavior where dt = ${day} and event_key = 'startVideo' group by dt
union
select 0 as watch_video_count,count(distinct uid) as complete_video_count,dt from dwd.user_behavior where dt = ${day} and event_key = 'endVideo'
and (end_video_time - start_video_time) >= video_length group by dt) tmp group by dt
"
package com.atguigu.user_behavior
import org.apache.spark.sql.SparkSession
object AppCourseStudyAnalysis {
def main(args: Array[String]): Unit = {
// 获取日期并验证
val day = args(0)
if("".equals(day) || day.length() != 8){
println("Usage:Please input date,eg:20190402")
System.exit(1)
}
// 获取SparkSession,并支持Hive操作
val spark: SparkSession = SparkSession.builder()
.appName(this.getClass.getSimpleName)
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.enableHiveSupport()
.master("local[2]")
.getOrCreate()
import spark.sql
// 创建临时表
sql(s"""
|create table if not exists tmp.app_cource_study_analysis_${day}(
|watch_video_count INT,
|complete_video_count INT,dt INT)
|row format delimited fields terminated by '\t'
""".stripMargin)
// 将分析结果插入临时表
sql(
s"""
|insert overwrite table tmp.app_cource_study_analysis_${day}
|select sum(watch_video_count),sum(complete_video_count),dt from (
|select count(distinct uid) as watch_video_count,0 as complete_video_count,dt from dwd.user_behavior where dt = ${day} and event_key = 'startVideo' group by dt
|union
|select 0 as watch_video_count,count(distinct uid) as complete_video_count,dt from dwd.user_behavior where dt = ${day} and event_key = 'endVideo'
|and (end_video_time - start_video_time) >= video_length group by dt) tmp group by dt
""".stripMargin)
spark.stop()
}
}
思考:
可以使用SparkSQL将分析结果直接写入到Mysql,而我们使用的是SparkSQL将结果写入到Hive后,再通过Sqoop导出到Mysql,两者哪个更好?
sqoop export --connect jdbc:mysql://192.168.137.10:3306/user_behavior --username root --password 123456 --table app_cource_study_report --columns watch_video_cnt,complete_video_cnt,dt --fields-terminated-by "\t" --export-dir "/user/hive/warehouse/tmp.db/app_cource_study_analysis_${day}" --input-null-string '\\N'
各版本访问流量
ios访问流量
android访问流量
其它操作系统访问流量
create table if not exists tmp.app_version_analysis_${day}(
os STRING,
version STRING,
access_count INT,
dt INT
) row format delimited fields terminated by "\t"
insert overwrite table tmp.app_version_analysis
select os,version,count(1) as access_count,dt from dwd.user_behavior where dt = ${day} group by os,version,dt;
要点:
SQL语句中不要出现select *或count(*)等内容
昨日新增用户数
昨日渠道新增用户数
昨日自然新增用户数
create table if not exists tmp.app_channel_analysis_${day}(
channel STRING,
new_user_cnt INT,
dt INT
) row format delimited fields terminated by "\t"
insert overwrite table tmp.app_channel_analysis_${day}
select channel,count(distinct uid),dt from dwd.user_behavior where dt = ${day} and event_key = "registerAccount" group by channel,dt;
访问 1 - 2 次(包含2次)
访问 3 - 4 次(包含4次)
访问大于4次
create table if not exists tmp.app_access_cnt_ranger_analysis_${day}(
le_two INT,
le_four INT,
gt_four INT,
dt INT
) row format delimited fields terminated by "\t";
第一步:计算每个用户的访问次数并分组
drop table if exists tmp.user_access_cnt_${day};
create table if not exists tmp.user_access_cnt_${day} as select uid,count(1) as access_cnt,dt from dwd.user_behavior where dt = ${day} group by uid,dt;
第二步:根据访问次数来计算用户分步并插入最终表
insert overwrite table tmp.app_access_cnt_ranger_analysis_${day}
select sum(le_two) as le_two,sum(le_four) as le_four,sum(gt_four) as gt_four,dt from
(select count(1) as le_two,0 as le_four,0 as gt_four,dt from tmp.user_access_cnt_${day} where access_cnt <= 2 group by dt
union all
select 0 as le_two,count(1) as le_four,0 as gt_four,dt from tmp.user_access_cnt_${day} where access_cnt <= 4 group by dt
union all
select 0 as le_two,0 as le_four,count(1) as gt_four,dt from tmp.user_access_cnt_${day} where access_cnt > 4 group by dt) tmp
group by dt;
打开app -> 开始看视频 - > 完成视频 -> 开始作业 -> 完成作业
只有看了视频,才有可能完成视频,才能开始写作业,也可以不写,但是写作业不一定完成,所以每一步都会有数据流失
create table if not exists tmp.app_study_funnel_analysis_${day}(
start_app_cnt INT,
start_video_cnt INT,
complete_video_cnt INT,
start_homework_cnt INT,
complete_homework INT,
dt INT
) row format delimited fields terminated by "\t";
insert overwrite table tmp.app_study_funnel_analysis_${day}
select count(distinct t1.uid) as start_app_cnt,count(distinct t2.uid) as start_video_cnt,count(distinct t3.uid) as complete_video_cnt,count(distinct t4.uid) as start_homework,count(distinct t5.uid) as complete_homework,t1.dt from
(select uid,dt from dwd.user_behavior where dt = ${day} and event_key = "startApp") t1
left join
(select uid from dwd.user_behavior where dt = ${day} and event_key = "startVideo") t2
on t1.uid = t2.uid
left join
(select uid from dwd.user_behavior where dt = ${day} and event_key = "endVideo" and (end_video_time - start_video_time) >= video_length) t3
on t2.uid = t3.uid
left join
(select uid from dwd.user_behavior where dt = ${day} and event_key = "startHomework") t4
on t3.uid = t4.uid
left join
(select uid from dwd.user_behavior where dt = ${day} and event_key = "completeHomework") t5
on t4.uid = t5.uid group by t1.dt
思考:
1、时序漏洞怎么做,全部时序要求在2小时内
2、每步时序在15分钟内
create table tmp.seven_days_retained_analysis_${day}(
register_day INT,
zero_interval_retained_rate DOUBLE,
one_interval_retained_rate DOUBLE,
two_interval_retained_rate DOUBLE,
three_interval_retained_rate DOUBLE,
four_interval_retained_rate DOUBLE,
five_interval_retained_rate DOUBLE,
six_interval_retained_rate DOUBLE,
dt INT
) row format delimited fields terminated by "\t";
// 获取近7日内全部用户的注册日期
select uid,dt as register_day,event_time from dwd.user_behavior where dt between ${startDay} and ${endDay} and event_key = "registerAccount"
// 获取近7日每天活跃的用户列表
select uid,dt as active_day,max(eventTime) as event_time from dwd.user_behavior where dt between ${startDay} and ${endDay} group by uid,dt
// 两者整合,生成uid,register_day,active_day,interval(活跃时距离注册日期几天)
select t1.uid,t1.register_day,t2.active_day,datediff(from_unixtime(t2.event_time,"yyyy-MM-dd"),from_unixtime(t1.event_time,"yyyy-MM-dd")) as day_interval from
(select uid,dt as register_day,event_time from dwd.user_behavior where dt between ${startDay} and ${endDay} and event_key = "registerAccount") t1
left join
(select uid,dt as active_day,max(event_time) as event_time from dwd.user_behavior where dt between ${startDay} and ${endDay} group by uid,dt) t2
on t1.uid = t2.uid
结果格式:
001 20190301 20190301 0
001 20190301 20190303 2
002 20190302 20190303 1
// 根据上面的表再生成留存用户数临时表
drop table if exists tmp.user_retained_${startDay}_${endDay};create table if not exists tmp.user_retained_${startDay}_${endDay} as
select register_day,day_interval,count(1) as retained from (
select t1.uid,t1.register_day,t2.active_day,datediff(from_unixtime(t2.event_time,"yyyy-MM-dd"),from_unixtime(t1.event_time,"yyyy-MM-dd")) as day_interval from
(select uid,dt as register_day,event_time from dwd.user_behavior where dt between ${startDay} and ${endDay} and event_key = "registerAccount") t1
left join
(select uid,dt as active_day,max(event_time) as event_time from dwd.user_behavior where dt between ${startDay} and ${endDay} group by uid,dt) t2
on t1.uid = t2.uid) tmp group by register_day,day_interval
数据结果:
20190402 0 27000
20190402 1 19393
20190402 2 14681
20190402 3 9712
20190402 4 5089
20190402 5 1767
20190402 6 1775
// 计算近7日留存率
drop table if exists tmp.user_retained_rate_${startDay}_${endDay};create table if not exists tmp.user_retained_rate_${startDay}_${endDay} as
select register_day,day_interval,round(retained / register_cnt,4) as retained_rate,current_dt from (
select t1.register_day,t1.day_interval,t1.retained,t2.register_cnt,${endDay} as current_dt from
(select register_day,day_interval,retained from tmp.user_retained_${startDay}_${endDay}) t1
left join
(select dt,count(1) as register_cnt from dwd.user_behavior where dt between ${startDay} and ${endDay} and event_key = "registerAccount" group by dt) t2
on t1.register_day = t2.dt
group by t1.register_day,t1.day_interval ,t1.retained,t2.register_cnt) tmp
数据结果
20190402 0 1.0 20190402
20190402 1 0.7183 20190402
20190402 2 0.5437 20190402
20190402 3 0.3597 20190402
20190402 4 0.1885 20190402
20190402 5 0.0654 20190402
20190402 6 0.0657 20190402
20190403 0 1.0 20190402
20190403 1 0.7183 20190402
20190403 2 0.5437 20190402
20190403 3 0.3597 20190402
20190403 4 0.1885 20190402
20190403 5 0.0654 20190402
// 到这里还没有结束,咱们再来个列转行
insert overwrite table tmp.seven_days_retained_analysis_${day}
select
register_day,
max(case when day_interval = 0 then retained_rate else 0 end) as zero_interval_retained_rate,
max(case when day_interval = 1 then retained_rate else 0 end) as one_interval_retained_rate,
max(case when day_interval = 2 then retained_rate else 0 end) as two_interval_retained_rate,
max(case when day_interval = 3 then retained_rate else 0 end) as three_interval_retained_rate,
max(case when day_interval = 4 then retained_rate else 0 end) as four_interval_retained_rate,
max(case when day_interval = 5 then retained_rate else 0 end) as five_interval_retained_rate,
max(case when day_interval = 6 then retained_rate else 0 end) as six_interval_retained_rate,
current_dt
from tmp.user_retained_rate_${startDay}_${endDay} group by register_day,current_dt;
思考题:上面的SQL完全可以写在一个大SQL中完成,为什么要分解成这么多步完成?
问题:如果之前导出的数据错了,要重新执行sqoop脚本,这时侯会有什么现象?
答:会在mysql中新生成dt相同,但统计结果不同的数据,造成数据混乱!
如何解决:
1. sqoop导出时使用upsert模式
sqoop export --connect jdbc:mysql://192.168.137.10:3306/user_behavior --username root --password 123456 --table app_cource_study_report --columns watch_video_cnt,complete_video_cnt,dt --fields-terminated-by "\t" --export-dir "/user/hive/warehouse/tmp.db/app_cource_study_analysis_${day}" --update-key dt --update-mode allowinsert --input-null-string '\\N'
使用这种模式的前提:
1、--update-key后面跟的字段要设置为唯一索引
create unique index app_cource_study_report_dt on app_cource_study_report (dt);
2、同时要想updated_at字段自动更新,需要创建触发器
ALTER TABLE app_cource_study_report MODIFY updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP NOT NULL;
再次使用sqoop导入,发现updated_at的日期没有变化,但是数据确实是插入了,这里面有个误区,只有记录的值被修改过,updated_at才会更新,使用如下语句再次测试:
UPDATE app_cource_study_report SET watch_video_cnt = 88888
遗憾的是upsert模式与staging-table模式不兼容,二者只能选一,一般–staging_table方式更多
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。