赞
踩
未被external修饰的是内部表,被external修饰的是外部表;
内部表数据由Hive自身管理,外部表由HDFS管理;
删除内部表会直接删除元数据及存储数据,删除外部表,仅仅会删除元数据,数据文件不会删除。
分区是按照分区字段在HDFS上建立子文件夹,分区内的数据存放在子文件夹内,查询时不需要全局扫描,只扫描对应分区文件夹的数据。
而分桶是按分桶字段对数据取hash值,值相同的放在同一个分桶文件里,分桶生成的是分桶文件,分区对应的是子文件夹;
分区和分桶最大的区别是:
首先,分桶是随机的分割数据,分区是非随机的分割数据,分桶是按照分桶字段取哈希函数,相对比较均匀;分区表按照分区字段的值进行分割,容易产生数据倾斜。
其次,分桶对应的是不同的文件,分区对应的是HDFS上的目录,桶表对应的是目录里的文件。
Hive的结构分为用户接口client、元数据metaastore、hadoop和驱动器driver
附加:Hive压缩格式:gzib,bzip2,lzo,snappy,其中snappy的压缩比和压缩速度及解压缩速度最快(我司用parquet存储+snappy压缩格式)
1.动态分区插入数据,产生大量的小文件,从而导致map数量剧增。
2.reduce数量越多,小文件也越多(reduce的个数和输出文件是对应的)。
3.数据源本身就包含大量的小文件。
1.从Hive的角度看,小文件会开很多map,一个map开一个JVM去执行,所以这些任务的初始化,启动,执行会浪费大量的资源,严重影响性能。
2.在HDFS中,每个小文件对象约占150byte,如果小文件过多会占用大量内存,这样NameNode内存容量严重制约了集群的扩展。
从小文件产生的途经就可以从源头上控制小文件数量
1.使用ORC或Parquet格式作为表存储格式,不要用textfile,在一定程度上可以减少小文件。
2.减少reduce的数量(可以使用参数进行控制)。
3.少用动态分区,用时记得按distribute by分区。
4.使用hadoop archive命令把小文件进行归档。
5.开启map端小文件合并
Map: 合并小文件,减少map个数:set hive.merge.mapredfiles = true; --开启小文件合并增加map个数:当文件很小,小于128M,但是数据量有几千万行,放到一个map中执行很慢,可以通过自定义map个数增加map数:set mapred.map.tasks=10;
Reduce: reduce的个数设定极大影响任务执行效率,不指定reduce个数的情况下,Hive会根据默认配置计算reduce个数.
reduce个数的计算公式:Min(每个job最大reduce数,总输入数据量/每个reduce处理的数据量)
hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为1000^3=1G)
hive.exec.reducers.max(每个任务最大的reduce数,默认为999)
手动指定reduce个数:set mapred.reduce.tasks = 15;
1.没使用group by, select count()
2.用了order by
3.有笛卡尔集
(这些情况都是全局的)
set hive.exec.parallel=true; //打开任务并行执行
set hive.exec.parallel.thread.number=16; //同一个sql允许最大并行度,默认为8。
解决方案:
第一种:可以直接不让null值参与join操作,即不让null值有shuffle阶段
第二种:因为null值参与shuffle时的hash结果是一样的,那么我们可以给null值随机赋值,这样它们的hash结果就不一样,就会进到不同的reduce中:
解决方案:
如果key字段既有string类型也有int类型,默认的hash就都会按int类型来分配,那我们直接把int类型都转为string就好了,这样key字段都为string,hash时就按照string类型分配了:
解决方案:
这种数据倾斜问题没有什么好的解决方案,只能将使用GZIP压缩等不支持文件分割的文件转为bzip和zip等支持文件分割的压缩方式。
所以,我们在对文件进行压缩时,为避免因不可拆分大文件而引发数据读取的倾斜,在数据压缩的时候可以采用bzip2和Zip等支持文件分割的压缩算法。
解决方案:
在Hive中可以通过参数 hive.new.job.grouping.set.cardinality 配置的方式自动控制作业的拆解,该参数默认值是30。表示针对grouping sets/rollups/cubes这类多维聚合的操作,如果最后拆解的键组合大于该值,会启用新的任务去处理大于该值之外的组合。如果在处理数据时,某个分组聚合的列有较大的倾斜,可以适当调小该值。
解决方案:
通常做法是将倾斜的数据存到分布式缓存中,分发到各个Map任务所在节点。在Map阶段完成join操作,即MapJoin,这避免了 Shuffle,从而避免了数据倾斜。
解决方案:
这类问题最直接的方式就是调整reduce所执行的内存大小。
调整reduce的内存大小使用mapreduce.reduce.memory.mb这个配置。
SparkContext向资源管理器注册并申请资源 ---->
资源管理器根据资源情况分配Executor,然后启动Executor -->
Executor启动后发送心跳至资源管理器 --->
SparkContext根据RDD之间依赖关系构建DAG运行图 --->
然后将DAG图分解成多个Stage --->
把Stage发送给StageScheduler -->
每个stage包含一个或多个task任务,StageScheduler将Task发放给Executor运行,同时SparkContext将应用程序代码发给Executor --->
Task在Executor上运行,运行完毕释放所有资源
Stage划分依据:Stage划分的依据是宽依赖,遇到窄依赖就加入本stage,遇到宽依赖进行Stage切分;
何时产生宽依赖:reduceByKey, groupByKey等算子,会导致宽依赖的产生。
Yarn-Cluster模式下,Driver运行在Application Master上,Application Master 同时负责从Yarn申请资源并监督作业的运行情况,用户提交作业之后,客户端就可以关闭,作业会继续在Yarn上运行。
Yarn-clinet模式下,Driver是运行在客户端,Applicatiion Master仅仅向Yarn请求Executor,然后有client跟containner通信调度工作,所以client不能关闭。
联系:两者都是将Mapper端的输出数据按key进行Partition,不同的Partition送到不同的reduce进行处理;Spark中的很多计算是作为MR计算框架的一种优化实现。
区别:
shuffle 就是一个GroupByKey的过程,两者没有本质区别。只是MR有多次排序,Spark尽量避免了MR shuffle中的多余的排序
MR将处理流程划分出明显的几个阶段:map,splil,merge,shuffle,sort,reduce等,每个阶段各司其职,按串行的方式实现每个阶段的功能;
在Spark中,没有这样功能明确的阶段,只有不同的Stage和一系列的transformation操作,所以splil,merge,aggregate等操作都包含着Transformation中
Mr 只能从一个map stage接受数据,Spark可以从多个Map stage shuffle数据(宽依赖,可以有多个数据源)
Spark的粒度更细,可以更及时的获取到record 与HashMap中相同的records进行合并
Spark考虑的更全面,Spark针对不同类型的操作、不同类型的参数,会使用不同类型的shuffle
sortBykey,groupBykey、reduceBykey、join、countBykey、cogroup等聚合操作的算子
num-executors --执行器个数
executor-memory --每个Executor的内存大小 (一般 所有Executor的总内存占用量不要超过 Yarn 的内存资源的 50%)
executor-cores --每个Executor的并行执行的task个数 (一般情况下总核数不要超过 Yarn 队列中 Cores 总数量的 50%。)
driver-memory --driver内存大小,一般默认1g
spark.default.parallelizm: --每个Stage的默认Task数量
原理:数据倾斜就是进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。
数据倾斜问题发现和定位:通过spark web ui 来查看当前运行的stage各个task分配的数据量, 从而进一步确定是不是task分配的数据不均匀导致了数据倾斜,知道数据倾斜发生在哪个stage之后,我们就需要根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部分,这部分代码中肯定会有一个shuffle类型的算子,通过countByKey查看各个key的分布.
数据倾斜的解决方案
方案实现思路:这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。
方案实现思路:将含有较多倾斜key的RDD扩大多倍,与相对分布均匀的RDD配一个随机数。
窄依赖:父RDD的一个分区只会被子RDD的一个分区依赖。
宽依赖:父RDD的一个分区会被子RDD的多个分区依赖(涉及到shuffle)
根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。
Stage是一个TaskSet,将Stage根据分区数划分成一个个的Task。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。