赞
踩
周末两天,整理了近些年从事大数据工作经验以及调优,越是到深水域,越发感觉知识的无穷尽。更深切的理解 Stay Hungry,Stay Foolish。
1.联机事务处理 OLTP(on-line transaction processing)
OLTP是传统的关系型数据库的主要应用,主要是基本的、日常的事务处理,例如银行交易。
2.联机分析处理 OLAP(On-Line Analytical Processing)
OLAP是数据仓库系统的主要应用,支持复杂的分析操作,侧重决策支持,并且提供直观易懂的查询结果。
区别
OLTP就是面向我们的应用系统数据库的,OLAP是面向数据仓库的。
即席查询(Ad Hoc)是用户根据自己的需求,灵活的选择查询条件,系统能够根据用户的选择生成相应的统计报表。
ODS operation data store 原始数据。直接加载原始日志,数据,数据保存原貌不作处理
dwd data warehouse detail 结构和粒度与原始表保持一致。对ODS层数据进行清洗[去除空值,脏数据,超过极限范围的数据]
dws data warehouse service 以dwd为基础,进行轻度汇总。此层通常会以一个维度为线索,进行跨主题的宽表
ads application data store 为各种“统计”报表提供数据。表是关于特定主题最终汇总后的结果,表中的每个字段就是统计指标。
大数据公司的业务分层 -- ods dwd dwa [面向主题域的粗粒度汇总]
星型模型 vs 雪花模型 区别 -- [一层维度 vs 多层维度]
维度的层级,标准的星型模型只有一层,而雪花模型可能会涉及多层
性能优先选择星型模型,灵活优先选择雪花模型。企业中星型模型多一些。
模型选择
星型 vs 雪花 取决于性能优先,还是灵活优先
实际开发,可能并存。但整体来看,更倾向于维度更少的【星型模型】。尤其是Hadoop体系,减少join就是减少shuffle,提升性能。
关系型数据库可以依靠强大的主键索引。
星型模型(一级维度表),雪花(多级维度),星座模型(星型模型+多个事实表)
星型模型
有一张事实表,周围环绕着一系列的维度表组合而成的表结构。
举例 事实表:销售数量,销售金额
维度表:部门维度[总分公司]、地域维度[省市区]、产品维度、时间维度[年月日]
雪花模型
有一张事实表,周围环绕着一系列的维度表,维度表中复杂的字段通过其他的维度表进行详细描述,这样组合而成的表结构
区别 维度表的字段很复杂,需要对维度表的字段进行说明 【维度表 针对 维度表进行说明】
举例 第二个维度表进行二次说明 针对 地域维度[省市区]进行维度说明
国家 国家维 国家键 国家名称
省份 省份维 省份键 省份名称
地市 地市维 地市键 地市名称
星座模型 星系模型 [用的少]
有多张事实表,每张事实表周围围绕着一系列的维度表,维度表又可以拥有子维度表
星座模型和前两种情况的区别是事实表的数量,星座模型是基于多个事实表
星座模型并不和另2个模型冲突。多个事实表,他们之间共享一些维度表。
大白话:有多张事实表[其他模型只有一张事实表];有多张维度表,维度表又有多张子维度表
属性 | 星型模型 | 雪花模型 |
数据总量 | 多 | 少 |
可读性 | 容易 | 差 |
表个数 | 少 | 多 |
查询速度 | 快 | 慢 |
冗余度 | 高 | 低 |
对实时表的情况 | 增加宽度 | 字段比较少,冗余底 |
扩展性 | 差 | 好
|
应用场景
星型模型的设计方式主要带来的好处是能够提升查询效率,因为生成的事实表已经经过预处理,主要的数据都在事实表里面,所以只要扫描实时表就能够进行大量的查询,而不必进行大量的join,其次维表数据一般比较少,在join可直接放入内存进行join以提升效率,除此之外,星型模型的事实表可读性比较好,不用关联多个表就能获取大部分核心信息,设计维护相对比较简答。
雪花模型的设计方式是比较符合数据库范式的理念,设计方式比较正规,数据冗余少,但在查询的时候可能需要join多张表从而导致查询效率下降,此外规范化操作在后期维护比较复杂。
总结
通过上面的对比,我们可以发现数据仓库大多数时候是比较适合使用星型模型构建底层数据Hive表,通过大量的冗余来提升查询效率,星型模型对OLAP的分析引擎支持比较友好,这一点在Kylin中比较能体现。而雪花模型在关系型数据库中如MySQL,Oracle中非常常见,尤其像电商的数据库表。在数据仓库中雪花模型的应用场景比较少,但也不是没有,所以在具体设计的时候,可以考虑是不是能结合两者的优点参与设计,以此达到设计的最优化目的。
关于数据存储,本人在快手的面试中曾经遇到,例如 男1 女0 等用0 1标识 。以此类推。
引入目的 -- 增加了开始时间,结束时间 ,对新增及变化表做定期合并
a.通过关系型数据库的create time和operation time获取数据的新增和变化。
b.用临时拉链表解决Hive了中数据不能更新的问题。
解决了什么问题?
缓慢变化维:偶尔变化一次 一个月2个月变化一次
实现方式
记录每条信息的生命周期,一旦一条记录的生命周期结束,就重新开始一条新的记录,并把当前日期放入生效开始日期
如果当前信息至今有效,在生效结束日期中填入一个极大值[如 9999-99-99,极大值永远也到不了]
使用场景
拉链表适合于:数据会发生变化,但是大部分是不变的 【即缓慢变化维】
附;数据不变化的 -- 事务性事实表,一旦产生就不会变化,例如 身份证-姓名对应关系
用户ID | 姓名 | 手机号码 | 开始日期 | 结束时间 |
1 | 张三 | 135****135 | 2019-01-01 | 2019-01-01 |
1 | 张三 | 176****176 | 2019-01-02 | 2019-01-09 |
1 | 张三 | 189****189 | 2019-01-10 | 9999-99-99 |
用户信息会发生变化,但是每天变化的比例不高。每当数据发生变化时记录下来,并记录数据变化的起始时间
反面案例:如果数据量有一定规模,按照每日全量的方式保存效率很低。比如:1亿用户*365天,每天一份用户信息。[做每日全量效率低]
但是如果每天发生变化,不如做全量更新。经常变化的做全量。只有缓慢变化的,才去选型做拉链表,节省存储空间
案例分析:(以下案例来源于网上)
拉链表的形成
1. 初始化:每条数据加上 开始日期 结束时间 [全量]导入Hive得到初始的拉链表
2. 得到增量变化表。当发生修改、增加:过滤出修改、增加变化的数据 得到用户变化表
3. 将初始的拉链表 和 用户变化表合并 [得到临时表,并overwrite覆盖]
新增的数据只是加上 开始日期 结束时间
更新的数据 a.用left join把结束时间更新 b.更新后的数据仍然要保留
1.初始化拉链表 -- dwd_order_info_his 拼接开始-结束日期
`start_date` string COMMENT '有效开始日期',
`end_date` string COMMENT '有效结束日期'
- insert overwrite table dwd_order_info_his
- select
- id,
- total_amount,
- order_status,
- user_id,
- payment_way,
- out_trade_no,
- create_time,
- operate_time,
- '2019-01-01',
- '9999-99-99'
- from ods_order_info oi
- where oi.dt='2019-01-01';
2.得到增量变化表 dwd_order_info
3.先合并变动信息,再追加新增信息,插入到临时表中,临时表覆盖给拉链表
- insert overwrite table dwd_order_info_his_tmp
- select * from
- (
- select
- id,
- total_amount,
- order_status,
- user_id,
- payment_way,
- out_trade_no,
- create_time,
- operate_time,
- '2019-01-02' start_date,
- '9999-99-99' end_date
- from dwd_order_info where dt='2019-01-02'
-
- union all
- select oh.id,
- oh.total_amount,
- oh.order_status,
- oh.user_id,
- oh.payment_way,
- oh.out_trade_no,
- oh.create_time,
- oh.operate_time,
- oh.start_date,
- if(oi.id is null, oh.end_date, date_add(oi.dt,-1)) end_date
- from dwd_order_info_his oh left join
- (
- select
- *
- from dwd_order_info
- where dt='2019-01-02'
- ) oi
- on oh.id=oi.id and oh.end_date='9999-99-99'
- )his
- order by his.id, start_date;
步骤总结
1. 初始化拉链表 拼接开始-结束日期
2.得到增量表
3.增量表 拼接开始-结束日期
union all
(拉链表 left join 增量表) 关联上的更新结束日期
1.shffle优化
a.增加环形缓冲区大小 默认100m -》200m [快排,对key进行排排,按照字典顺序]
b.溢写比例 80%-》90%
c.溢写会产生N个文件,要进行文件归并,一次归并文件个数默认10,可以改为20。[机器性能OK]
2.为了减少网络传输,启用压缩
map之前 适合可切片/切分的压缩 lzo gz2
map输出 为了快,不涉及切片 snappy压缩 lzo
reduce输出 如果想永久保存。可以慢,要压缩得小 gzip
注:hadoop 默认不支持 LZO,如果需要支持LZO压缩,需要添加jar包,并在core-site.xml文件中添加相关的压缩配置
mapTask 内存默认1G 开发可以调到4--6G 每个mapTask默认1个cpu核心
reduceTask 内存默认 1G 每个reduceTask默认1个核心
reduce端拉取 默认一次拉取5个分区,比如拉取10个;拉取的数据放到内存中,内存可以调大,内存可以调大
3.hadoop小文件优化
a.文件归档
b.自定义FileInputFormat进行小文件合并,sequenceFile文件存储
c.combinerInputFormat 改变切片,解决输入端大量小文件场景。 在map执行前合并小文件,减少map数:CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能。
set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
d.采用JVM重用,适合小文件多的情况,频繁开启。【DD提高性能50%】
执行每个小文件都要频繁打开文件句柄,JVM频繁开关,开关时间比运行时间还要长。开启JVM重用,JVM一直开
4.内部表 vs 外部表
数仓中各层建的表都是外部表。
删除数据时
a.内部表会直接删除元数据(metadata)及存储数据
b.外部表只是删除元数据,并没有删除HDFS原始数据
表是外部表,所以Hive并非认为其完全拥有这份数据。删除该表并不会删除掉这份数据,不过描述表的元数据信息会被删除掉。
删除内部表会直接删除元数据(metadata)及存储数据;删除外部表仅仅会删除元数据,HDFS上的文件并不会被删除;
对内部表的修改会将修改直接同步给元数据,而对外部表的表结构和分区进行修改,则需要修复(MSCK REPAIR TABLE table_name;)
使用场景
内部表 在创建临时表的时候使用,只有本人一人使用。只有自己建的临时表
绝大部分都是外部表,供多人使用
未被external修饰的是内部表(managed table),被external修饰的为外部表(external table);
区别:
内部表数据由Hive自身管理,外部表数据由HDFS管理;
内部表数据存储的位置是hive.metastore.warehouse.dir(默认:/user/hive/warehouse),外部表数据的存储位置由自己制定(如果没有LOCATION,Hive将在HDFS上的/user/hive/warehouse文件夹下以外部表的表名创建一个文件夹,并将属于这个表的数据存放在这里);
删除内部表会直接删除元数据(metadata)及存储数据;删除外部表仅仅会删除元数据,HDFS上的文件并不会被删除;
对内部表的修改会将修改直接同步给元数据,而对外部表的表结构和分区进行修改,则需要修复(MSCK REPAIR TABLE table_name;)
5、数仓中使用的哪种文件存储格式
常用的包括:textFile, ORC,Parquet,一般企业里使用ORC或者Parquet,因为是列式存储,且压缩比非常高,所以相比于textFile,查询速度快,占用硬盘空间少
ODS层采用什么压缩方式和存储格式?
压缩采用Snappy,存储采用orc,压缩比是100g数据压缩完10g左右。
6.元数据的备份 - 备份的是mysql中的数据
元数据备份(重点,如数据损坏,可能整个集群无法运行,至少要保证每日零点之后备份到其它服务器两个复本)
hive的元数据默认derby,一般改到mysql [要支持多个客户端访问]
通过keepalive自动切换mysql master 和mysql slave的主从状态
7.客户端找MetaData,然后访问DataNode过程。
Hive主要做的事,顺序:解析器->编译器->优化器->执行器
1.解析器看SQL语法,语法通过了去编译器。
2.编译器负责翻译,把HQL翻译成MR任务,然后交给优化器。
3.优化器对MR任务进行优化。
4.最后交给执行器执行任务。
8.Hive中的元数据 Metastore
表名、表所属的数据库(默认是default)、表的拥有者、列/分区字段、表的类型(是否是外部表)、表的数据所在目录等;
默认存储在自带的derby数据库中,推荐使用MySQL存储Metastore
9.行式存储 vs 列式存储
行存储:textFile、sequence。查询满足条件的一整行数据的时候,列存储则需要去每个聚集的字段找到对应的每个列的值,行存储只需要找到其中一个值,其余的值都在相邻地方,所以此时行存储查询的速度更快。
列存储:orc、parquet。因为每个字段的数据聚集存储,在查询只需要少数几个字段的时候,能大大减少读取的数据量;每个字段的数据类型一定是相同的,列式存储可以针对性的设计更好的设计压缩算法。
1.大小表join
2.mapjoin
3.count(distinct) --> group by
4.动态分区
5.数据倾斜
map端
小文件合并 CombineHiveInputFormat
set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
reduce端
6.并行执行
Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。或者Hive执行过程中可能需要的其他阶段。默认情况下,Hive一次只会执行一个阶段。
set hive.exec.parallel=true; //打开任务并行执行
set hive.exec.parallel.thread.number=16; //同一个sql允许最大并行度,默认为8。
7.严格模式
对分区表的查询必须使用到分区相关的字段
hive.mapred.mode值为默认是非严格模式nonstrict
使用了order by语句的查询,要求必须使用limit语句。
限制笛卡尔积的查询。
8.JVM 重用
应用场景
小文件的场景或task特别多的场景,这类场景大多数执行时间都很短。
JVM重用可以使得JVM实例在同一个job中重新使用N次。
开启JVM重用将一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。如果某个“不平衡的”job中有某几个reduce task执行的时间要比其他Reduce task消耗的时间多的多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。
9.推测执行
推测执行(Speculative Execution)机制,推测出“慢作业”任务,并为这样的任务启动一个备份任务,让该任务与原始任务同时处理同一份数据,并最终选用最先成功运行完成任务的计算结果作为最终结果。
缺点
用户因为输入数据量很大而需要执行长时间的map或者Reduce task的话,那么启动推测执行造成的浪费是非常巨大大。
10.存储
行式存储:textFile、sequence
列式存储:orc、parquet
压缩比:ORC > Parquet > textFile
ORC (Optimized Row Columnar)
默认是每隔1W行做一个索引(Index Data),然后对这些行的数据(Row Data)按列进行存储
11.压缩
hive表的数据存储格式一般选择:orc或parquet。压缩方式一般选择snappy,lzo。
12.执行计划 Expalin
1.Order By:全局排序,一个Reducer
2.Sort By:每个Reducer内部进行排序,对全局结果集来说不是排序。
3.Distribute By:类似MR中partition,进行分区,结合sort by使用[MR中的分区字段和排序字段可以不是一个]。
注意,Hive要求DISTRIBUTE BY语句要写在SORT BY语句之前。
按照部门分区,按照薪水排序
insert overwrite local directory '/opt/module/datas/distribute-result' select * from emp distribute by deptno sort by empno desc;
4.Cluster By
当 distribute by和sorts by字段相同时,可以使用cluster by方式。
cluster by除了具有distribute by的功能外还兼具sort by的功能。但是排序只能是倒序排序,不能指定排序规则为ASC或者DESC。
select * from emp distribute by deptno sort by deptno;
分桶的规则,就类似于 mapreduce 中的 默认分区规则: HashPartitioner
- // 多个reduceTask,每个reduceTask分别有序
- set mapreduce.job.reduces=3;
- drop table student_orderby_result;
- create table student_orderby_result as
- select * from student
- distribute by (case when age > 20 then 0 when age < 18 then 2 else 1 end) sort by (age desc);
以下案例来源于网上,仅供自己学习使用,如有侵权,请及时周知。
输入
name dept_id sex
悟空 A 男
大海 A 男
宋宋 B 男
凤姐 A 女
婷姐 B 女
婷婷 B 女
输出
dept_id 男 女
A 2 1
B 1 2
- select
- dept_id,
- sum(case sex when '男' then 1 else 0 end) male_count,
- sum(case sex when '女' then 1 else 0 end) female_count
- from
- emp_sex
- group by
- dept_id;
CONCAT(string A/col, string B/col…):
CONCAT_WS(separator, str1, str2,...)
COLLECT_SET(col):函数只接受基本数据类型,它的主要作用是将某字段的值进行去重汇总,产生array类型字段。
输出
name constellation blood_type
孙悟空 白羊座 A
大海 射手座 A
宋宋 白羊座 B
猪八戒 白羊座 A
凤姐 射手座 A
输出 把星座和血型一样的人归类到一起
射手座,A 大海|凤姐
白羊座,A 孙悟空|猪八戒
白羊座,B 宋宋
- select
- t1.base,
- concat_ws('|', collect_set(t1.name)) name
- from
- (select
- name,
- concat(constellation, ",", blood_type) base
- from
- person_info) t1
- group by
- t1.base;
EXPLODE(col):将hive一列中复杂的array或者map结构拆分成多行。
LATERAL VIEW
用法:LATERAL VIEW udtf(expression) tableAlias AS columnAlias
解释:用于和split, explode等UDTF一起使用,它能够将一列数据拆成多行数据,在此基础上可以对拆分后的数据进行聚合。
输入
movie category
《疑犯追踪》 悬疑,动作,科幻,剧情
《Lie to me》 悬疑,警匪,动作,心理,剧情
《战狼2》 战争,动作,灾难
输出
《疑犯追踪》 悬疑
《疑犯追踪》 动作
《疑犯追踪》 科幻
《疑犯追踪》 剧情
《Lie to me》 悬疑
《Lie to me》 警匪
《Lie to me》 动作
《Lie to me》 心理
《Lie to me》 剧情
《战狼2》 战争
《战狼2》 动作
《战狼2》 灾难
- select
- movie,
- category_name
- from
- movie_info lateral view explode(category) table_tmp as category_name;
窗口函数
OVER():指定分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变而变化
CURRENT ROW:当前行
n PRECEDING:往前n行数据
n FOLLOWING:往后n行数据
UNBOUNDED:起点,UNBOUNDED PRECEDING 表示从前面的起点, UNBOUNDED FOLLOWING表示到后面的终点
LAG(col,n):往前第n行数据
LEAD(col,n):往后第n行数据
NTILE(n):把有序分区中的行分发到指定数据的组中,各个组有编号,编号从1开始,对于每一行,NTILE返回此行所属的组的编号。注意:n必须为int类型。
Rank函数
RANK() 排序相同时会重复,总数不会变
DENSE_RANK() 排序相同时会重复,总数会减少
ROW_NUMBER() 会根据顺序计算
1.stage task
Stage:根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。
Task:Stage是一个TaskSet,将Stage根据分区数划分成一个个的Task。
2.reduceByKey groupByKey
reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。
groupByKey:按照key进行分组,直接进行shuffle。
开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。
3.repartition coalesce
两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)
区别:
repartition一定会发生shuffle,coalesce根据传入的参数来判断是否发生shuffle
一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce
4.Spark中的缓存机制(cache和persist)与checkpoint机制
都是做RDD持久化的
cache:内存,不会截断血缘关系,使用计算过程中的数据缓存。
checkpoint:磁盘,截断血缘关系,在ck之前必须没有任何任务提交才会生效,ck过程会额外提交一次任务。
5.当Spark涉及到数据库的操作时,如何减少Spark运行中的数据库连接数?
使用foreachPartition代替foreach,在foreachPartition内获取数据库的连接。
写一个wc吧
- val conf: SparkConf =
- new SparkConf().setMaster("local[*]").setAppName("WordCount")
-
- val sc = new SparkContext(conf)
-
- sc.textFile("/input")
- .flatMap(_.split(" "))
- .map((_, 1))
- .reduceByKey(_ + _)
- .saveAsTextFile("/output")
-
- sc.stop()
夜深了,有时间了继续补充
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。