赞
踩
master&worker:(spark独立部署模式里的概念):
master是一个进程,主要负责资源的调度和分配,进行集群的监控,类似于yarn的RM。
worker也是一个进程,一个Worker运行在集群中的一台服务器上,由Master分配资源对数据进行并行的处理和计算,类似于yarn中的NM。
Driver&Executor:
Driver是Spark驱动器节点,用于执行spark任务中的main方法,负责实际代码的执行工作。
Executor是集群工作节点(Worker)中的一个JVM进程,负责在Spark作业中运行具体的任务,任务彼此间相互独立。如果有Executor节点发生了故障,spark应用也可继续执行,会将出错节点上的任务调度到其他Executor节点上继续运行。
ApplicationMaster(spark on yarn):
Hadoop用户向YARN集群提交应用程序时,提交程序包含ApplicationMaster,用于向资源调度器申请执行任务的资源容器Container,运行job,监控整个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况。实现了资源(RM)和计算(driver)之间的解耦合。
spark context:控制整个application的生命周期,包括dagsheduler和task scheduler等组件。
client:用户提交程序的入口。
1.构建Spark Application的运行环境(启动SparkContext)
2.SparkContext向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源;
3.资源管理器分配Executor资源,Executor运行情况将随着心跳发送到资源管理器上;
4.SparkContext构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler
5.Executor向SparkContext申请Task,Task Scheduler将Task发放给Executor运行,SparkContext将应用程序代码发放给Executor。
6.Task在Executor上运行,运行完毕释放所有资源。
YarnClient 运行模式介绍:
1.client向ResouceManager申请启动ApplicationMaster,同时在SparkContext初始化中创建DAGScheduler和TaskScheduler
2.ResouceManager收到请求后分配container,在合适的NodeManager中启动ApplicationMaster
3.Dirver中的SparkContext初始化完成后与ApplicationMaster建立通讯,ApplicationMaster向ResourceManager申请Application的资源
4.一旦ApplicationMaster申请到资源,便与之对应的NodeManager通讯,启动Executor,并把Executor信息反向注册给Dirver
5.Dirver分发task,并监控Executor的运行状态,负责重试失败的task
6.运行完成后,Client的SparkContext向ResourceManager申请注销并关闭自己
YarnCluster 模式介绍:
1.任务提交后会和ResourceManager通讯申请启动ApplicationMaster
2.ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver。
3.Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配container,然后在合适的NodeManager上启动Executor进程,Executor进程启动后会向Driver反向注册
4.Executor全部注册完成后Driver开始执行main函数,之后执行到Action算子时,触发一个job,并根据宽依赖开始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行。
Yarn-client和Yarn-cluster的区别:
yarn-cluster模式下,Dirver运行在ApplicationMaster中,负责申请资源并监控task运行状态和重试失败的task,当用户提交了作业之后就可以关掉client,作业会继续在yarn中运行;
yarn-client模式下,Dirver运行在本地客户端,client不能离开。
executor-cores —— 每个executor使用的内核数,默认为1,官方建议2-5个,我 们企业是4个//8
num-executors —— 启动executors的数量,默认为2//10
executor-memory —— executor内存大小,默认1G//15g
driver-cores —— driver使用内核数,默认为1
driver-memory —— driver内存大小,默认512M//30g
1.基于内存计算,减少低效的磁盘交互;
2.基于DAG的高效的调度算法;
3.容错机制Linege:数据丢失或者出错,可以根据血缘进行数据重建。
spark相对于mapreduce的特点:
减少磁盘I/O:MR 会把map端中间结果输出和结果存储在磁盘中,reduce端又需要从磁盘读取中间结果,造成磁盘I/O瓶颈,而Spark允许将map端的中间输出和结果存储在内存中,reduce从中间结果拉取,避免了大量的磁盘I/O
增加并行度 :由于把中间结果写入磁盘与从磁盘读取中间结果属于不同的环境,hadoop简单的通过串行执行链接起来,而Spark则把不同的环节抽象成Stage,允许多个Stage既可以串行又可以并行执行
MapReduce 默认是排序的,spark 默认不排序,除非使用 sortByKey 算子。
RDD是spark提供的核心抽象,全称为弹性分布式数据集,在逻辑上是一个hdfs文件,在抽象上是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同结点上,从而让RDD中的数据可以被并行操作。RDD的数据默认存放在内存中,但是当内存资源不足时,spark会自动将RDD数据写入磁盘。
五大特性:
(1)A list of partitions
一个分区列表,RDD中的数据都存在一个分区列表里面
(2)A function for computing each split
作用在每一个分区中的函数
(3)A list of dependencies on other RDDs
一个RDD依赖于其他多个RDD,这个点很重要,RDD的容错机制就是依据这个特性而来的
(4)Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
可选的,针对于kv类型的RDD才具有这个特性,作用是决定了数据的来源以及数据处理后的去向
(5)Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
可选项,数据本地性,数据位置最优
1.数据可以完全放内存或完全放磁盘,也可以部分放在内存,部分放在磁盘,可以自动切换;
2.RDD出错后可自动重新计算(通过血缘自动容错);
3.可checkpoint(设置检查点用于容错),可persist或cache进行缓存;
4.里面的数据是分片的(也叫分区,partition),分片的大小可自由设置和细粒度调整。
窄依赖:父 RDD 的一个分区只会被子 RDD 的一个分区依赖;
宽依赖:父 RDD 的一个分区会被子 RDD 的多个分区依赖(涉及到 shuffle)
对于窄依赖: 窄依赖的多个分区可以并行计算; 窄依赖的一个分区的数据如果丢失只需要重新计算对应的分区的数据就可以了。
对于宽依赖: 划分 Stage(阶段)的依据:对于宽依赖,必须等到上一阶段计算完成才能计算下一阶段。
区分这两种依赖很有用,首先,窄依赖允许在一个集群节点上以流水线的方式计算所有父分区,
而宽依赖则需要首先计算好所有父分区数据,然后在节点之间进行Shuffle,这与MapReduce类似。
第二窄依赖能够更有效地进行失效节点的恢复,即只需要重新计算丢失分区的父分区,而且不同节点之间可以并行计算;而对于一个宽依赖的Lineage图,单个节点失效可能导致这个RDD的所有祖先丢失部分分区,因而需要整体重新计算。
DAG(Directed Acyclic Graph 有向无环图)指的是数据转换执行的过程,有方向,无闭环(其实就是 RDD 执行的流程); 原始的 RDD 通过一系列的转换操作就形成了 DAG 有向无环图,任务执行时,可以按照 DAG 的描述,执行真正的计算(数据被操作的一个过程)。
并行计算。
一个复杂的业务逻辑如果有 shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照 shuffle 进行划分(也就是按照宽依赖就行划分),就可以将一个 DAG 划分成多个 Stage/阶段,在同一个 Stage 中,会有多个算子操作,可以形成一个 pipeline 流水线,流水线内的多个平行的分区可以并行执行。
根据DAG有向无环图进行划分,从当前job的最后一个算子往前推,遇到宽依赖,那么当前在这个批次中的所有算子操作都划分成一个stage,然后继续按照这种方式在继续往前推,如在遇到宽依赖,又划分成一个stage,一直到最前面的一个算子。最后整个job会被划分成多个stage,而stage之间又存在依赖关系,后面的stage依赖于前面的stage。
核心算法:回溯算法
从后往前回溯/反向解析,遇到窄依赖加入本 Stage,遇见宽依赖进行 Stage 切分。
Spark 内核会从触发 Action 操作的那个 RDD 开始从后往前推,首先会为最后一个 RDD 创建一个 Stage,然后继续倒推,如果发现对某个 RDD 是宽依赖,那么就会将宽依赖的那个 RDD 创建一个新的 Stage,那个 RDD 就是新的 Stage 的最后一个 RDD。然后依次类推,继续倒推,根据窄依赖或者宽依赖进行 Stage 的划分,直到所有的 RDD 全部遍历完成为止
1.RDD不支持sparkSQL操作,DF和DS支持sparkSQL。
2.DF每一行类型固定为Row,只有通过解析才能获取值。
3.DS提供了强类型支持,把每一行数据看作一个对象,在自定义了case class之后可以很自由的获得每一行的信息。
4.DF和DS支持方便地保存文件格式,可以直接指定。
spark中会导致shuffle操作的有以下几种算子:
1、repartition类的操作:比如repartition、repartitionAndSortWithinPartitions、coalesce等;
2、byKey类的操作:比如reduceByKey、groupByKey、sortByKey等;
3、join类的操作:比如join、cogroup等。
Action算子:reduce(func)、collect()、count()、first()、take(n)、takeSample(withReplacement, num, [seed])、aggregate ()、saveAsTextFile(path)、foreach(func)、countByKey()
transformation算子:
一、Value类型
map()映射、flatMap()扁平化、groupBy()分组、filter()过滤、distinct()去重、sortBy()排序
二、key_Value类型
mapValues()只对V进行操作
groupByKey()按照K重新分组
reduceByKey()按照K聚合V
sortByKey()按照K进行排序
转换算子(transformations):
转换算子会从一个已经存在的数据集(RDD)中生成一个新的数据集(RDD),比如map就是一个转换算子,它通过映射关系从一个RDD生成了一个新的RDD。
行动算子(actions):
行动算子在进行数据集计算后会给driver程序返回一个值。
转换算子和行动算子最大的区别:
转换算子返回一个数据集而行动算子返回一个具体值,如reduce算子是行动算子 而 reducebykey是转换算子;
同时由于spark的惰性求值特性,所有的转换算子是不会立即计算结果的,转换算子只记录它应用的数据集,在行动算子需要给drive返回数据时转换算子才会去计算结果。(这个设计能让spark运行效率更高)
转换(transformations) :从已经存在的数据集中创建一个新的数据集,会创建一个新的RDD,例如map操作,会把数据集的每个元素传给函数处理,并生成一个新的RDD,常见如:Map,Filter,FlatMap,GroupByKey,ReduceByKey,Join,Sort,PartionBy
动作(actions) :在数据集上进行计算之后返回一个值到驱动程序,例如reduce动作,使用函数聚合RDD所有元素,并将结果返回给驱动程序,常见有:Collect,Reduce,Save,Lookup
map:对RDD每个元素转换,文件中的每一行数据返回一个数组对象
flatMap:对RDD每个元素转换,然后再扁平化,将所有的对象合并为一个对象,会抛弃值为null的值
累加器和广播变量属于共享变量,累加器是只写变量,广播变量是只读变量。
累加器:
原理:累加器用来把Executor端变量信息聚合到Driver端,在driver程序中定义的变量,在Executor端的每个task都会得到一份新的副本,每个task更新这些副本的值后,传回driver端进行合并。
用途:累加器的常见用途是在调试时对作业执行的过程中的事件进行计数。例如:统计 100 内的偶数的个数
用法:
通过调用 SparkContext 的 accumulator(initiaValue) 方法来创建累加器 ac
在 scala 中通过 += 来更改 ac(java 中通过 add 来修改)
使用 ac.value 来访问累加器的值
广播变量:
用途:
当多个 Executor 中的多个 Task 操作需要使用(读取)同一个很大变量时,如果我们采取常规方式把该变量发送到每一个 task 中,那么会极大地浪费性能,所以我们可以直接把该变量发送到每一个 Executor 上,Executor 上对应的 Task 可以共同访问该变量,这样就可以提高性能。
scala> val sourceRDD = sc.makeRDD(1 to 100, 3)
sourceRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :24
//1. 创建累加器
scala> val accumulator = sc.accumulator(0)
//2. 修改累加器的值
scala> val test = sourceRDD.map(x => {if(x % 2 == 0) accumulator += 1})
//3. 访问累加器
scala> println(accumulator.value)
50
血缘容错:血缘容错记录了较粗粒度的操作:例如filter、map、join,当rdd的部分分区数据丢失的时候,可以通过血缘来重新运算以及恢复丢失的分区。
checkpoint机制:如果是窄依赖:只要把丢失的父依赖的分区重新计算即可;但是是宽依赖:需要恢复父依赖的分区并且重新计算,开销会大。
因此有了checkpoint机制:将内存的变化持久化到磁盘持久存储,可以把RDD保存在hdfs的namenode中元数据edit log中并刷新到磁盘fsimage,斩断所需的依赖链,如果没有才往前追溯。
持久化机制:
cache机制:将RDD的结果写入内存,运行后缓存自动消失;
persist机制:将结果写入磁盘。
1.窄依赖计算结果会出现大量小文件,因此采用coalese方法和repartition方法最后返回一个特定分区的RDD;
2. 降低spark的并行度,生成的文件就会少一些;
3. 新增一个任务专门合并小文件。
checkpoint是安全可靠、不保留RDD血统的持久化方式,checkpoint 的数据通常是保存在高可用的文件系统中,比如 HDFS 中,所以数据丢失可能性比较低,Checkpoint 首先会调用 SparkContext 的 setCheckPointDIR()方法,设置一个容错的文件系统的目录,比如说 HDFS;然后对 RDD 调用 checkpoint()方法。之后在 RDD 所处的 job 运行结束之后,会启动一个单独的 job,来将 checkpoint 过的 RDD 数据写入之前设置的文件系统,进行高可用、容错的类持久化操作
spark非常重要的一个功能特性就是可以将RDD持久化在内存中。
调用cache()和persist()方法即可。cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用persist()的无参版本persist(MEMORY_ONLY),将数据持久化到内存中。
如果需要从内存中清除缓存,可以使用unpersist()方法。RDD持久化是可以手动选择不同的策略的。在调用persist()时传入对应的StorageLevel即可。
最主要的区别在于持久化只是将数据保存在BlockManager中,但是RDD的lineage(血缘关系,依赖关系)是不变的。但是checkpoint执行完之后,rdd已经没有之前所谓的依赖rdd了,而只有一个强行为其设置的checkpointRDD,checkpoint之后rdd的lineage就改变了。
持久化的数据丢失的可能性更大,因为节点的故障会导致磁盘、内存的数据丢失。但是checkpoint的数据通常是保存在高可用的文件系统中,比如HDFS中,所以数据丢失可能性比较低
两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)
区别:
repartition一定会发生shuffle,coalesce 根据传入的参数来判断是否发生shuffle。
一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce。
(1)cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间;
(2)cache只有一个默认的缓存级别MEMORY_ONLY ,cache调用了persist,而persist可以根据情况设置其它的缓存级别;
reduceByKey:reduceByKey会在结果发送至reducer之前会对每个mapper在本地进行merge,有点类似于在MapReduce中的combiner。这样做的好处在于,在map端进行一次reduce之后,数据量会大幅度减小,从而减小传输,保证reduce端能够更快的进行结果计算。
groupByKey:groupByKey会对每一个RDD中的value值进行聚合形成一个序列(Iterator),此操作发生在reduce端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成OutOfMemoryError。
所以在进行大量数据的reduce操作时候建议使用reduceByKey。不仅可以提高速度,还可以防止使用groupByKey造成的内存溢出问题。
1、PROCESS_LOCAL : 进程本地化,指task计算的数据在本进程(Executor)中
2、NODE_LOCAL:节点本地化,指task计算的数据在本节点(node)的磁盘上,当task在本进程中一直没有执行(如果Driver分发task 3s后没有执行,且重复5次后),此时Driver就把这个没有执行的task发送到本节点的其他executor中执行
3、NO_PREF:没有本地化这一说,无需本地化,如计算所需的数据在关系型数据中(MySQL或Oracle),node1节点中的MySQL,可以被node2或node3节点连接使用。
4、RACK_LOCAL:task计算的数据是在本机架的其他节点上
5、ANY:随机,任何地方都可以
优先级依次是1到5,DESC
Spark中的数据倾斜,包括Spark Streaming和Spark Sql,表现主要有下面几种:
1.Executor lost,OOM,Shuffle过程出错;
2.Driver OOM;
3.单个Executor执行时间特别久,整体任务卡在某个阶段不能结束;
4.正常运行的任务突然失败;
groupByKey;reduceByKey;aggregaByKey;join等算子引发的shuffle 操作,是导致数据倾斜可能发生的关键点所在:数据倾斜为某一个或者某几个 partition 的数据特别大,导致这几个 partition 上的计算需要耗费相当长的时间。
避免数据倾斜,解决方法,有多个方面:
1.前提是定位数据倾斜,是 OOM 了,还是任务执行缓慢,看日志,看 WebUI
2.避免不必要的 shuffle,如使用广播小表的方式,将 reduce-side-join 提升为 map-side-join
3.分拆发生数据倾斜的记录,分成几个部分进行,然后合并 join 后的结果
4.改变并行度,可能并行度太少了,导致个别 task 数据压力大
5.两阶段聚合,先局部聚合,再全局聚合
6.自定义 paritioner,分散 key 的分布,使其更加均匀
DStream是spark streaming提供的一种高级抽象,代表了一个持续不断的数据流。
DStream可以通过输入数据源来创建,比如Kafka、flume等,也可以通过其他DStream的高阶函数来创建,比如map、reduce、join和window等。
DStream内部其实不断产生RDD,每个RDD包含了一个时间段的数据。
Spark streaming一定是有一个输入的DStream接收数据,按照时间划分成一个一个的batch,并转化为一个RDD,RDD的数据是分散在各个子节点的partition中
Spark streaming内部的基本工作原理是:接受实时输入数据流,然后将数据拆分成batch,比如每收集一秒的数据封装成一个batch,然后将每个batch交给spark的计算引擎进行处理,最后会生产处一个结果数据流,其中的数据也是一个一个的batch组成的。
receiver 方式:将数据拉取到 executor 中做操作,可以通过 WAL,设置了本地存储,保证数据不丢失,然后使用 Kafka 高级 API 通过 zk 来维护偏移量,保证消费数据。receiver 消费的数据偏移量是在 zk 获取的,此方式效率低,容易出现数据丢失。
receiver 方式的容错性:在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用 Spark Streaming 的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的 Kafka 数据写入分布式文件系统(比如 HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。
基于 Direct 方式:使用 Kafka 底层 Api,其消费者直接连接 kafka 的分区上,因为 createDirectStream 创建的 DirectKafkaInputDStream 每个 batch 所对应的 RDD 的分区与 kafka 分区一一对应,但是需要自己维护偏移量,即用即取,不会给内存造成太大的压力,效率高。
优点:简化并行读取:如果要读取多个 partition,不需要创建多个输入 DStream 然后对它们进行 union 操作。Spark 会创建跟 Kafka partition 一样多的 RDD partition,并且会并行从 Kafka 中读取数据。所以在 Kafka partition 和 RDD partition 之间,有一个一对一的映射关系。
背压机制:根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。主要是为了更好的协调数据接收速率与资源处理能力。启用反压机制很简单,只需要将 spark.streaming.backpressure.enabled 设置为 true 即可,这个参数的默认值为 false
对于 Spark Streaming 与 kafka 结合的 direct Stream 可以自己维护 offset 到 zookeeper、kafka 或任何其它外部系统,每次提交完结果之后再提交 offset,这样故障恢复重启可以利用上次提交的 offset 恢复,保证数据不丢失。但是假如故障发生在提交结果之后、提交 offset 之前会导致数据多次处理,这个时候我们需要保证处理结果多次输出不影响正常的业务。
由此可以分析,假设要保证数据恰一次处理语义,那么结果输出和 offset 提交必须在一个事务内完成。在这里有以下两种做法:
(1)repartition(1) Spark Streaming 输出的 action 变成仅一个 partition,这样可以利用事务去做: Dstream.foreachRDD(rdd=>{ rdd.repartition(1).foreachPartition(partition=>{ // 开启事务 partition.foreach(each=>{// 提交数据 }) // 提交事务 }) })
(2)将结果和 offset 一起提交
也就是结果数据包含 offset。这样提交结果和提交 offset 就是一个操作完成,不会数据丢失,也不会重复处理。故障恢复的时候可以利用上次提交结果带的 offset。
计算节点不再接受新数据,而是将现有的数据处理完毕,然后再关闭。
具体实现:
另起一个线程:
new Thread(
{
new Runnable{
override def run(): Unit = {
//mysql:增加一行记录,若查询表,记录存在,则执行关闭操作
//redis: 在redis中设置一个标识,若满足要求,则进行关闭
//zk: 设置/stopSpark 节点,若节点存在则进行关闭
while(ture){
//查询出状态,假设状态为true,执行以下操作
if(true){
//获取SparkStreaming状态
val state : StreamingContextState = ssc.getState()
if( state == StreamingContextState.ACTIVE){
ssc.stop(stopSparkContext =true , stopGracefully = true)
}
}
Thread.sleep(millis = 5000)
System.exit(status = 0)
}
}
).start()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。