当前位置:   article > 正文

Spark原理_spark测试原理

spark测试原理

一、Spark原理

  1Spark的运行原理

i、分布式

Ii、主要基于内存(少数情况基于磁盘)

Iii、迭代式计算

2、Spark 计算模式 VS  MapReduce  计算模式对比

 

          Mr这种计算模型比较固定,只有两种阶段,map阶段和reduce阶段,两个阶段结束    后,任务就结束了,这意味着我们的操作很有限,只能在map阶段和reduce阶段,    也同时意味着可能需要多个mr任务才能处理完这个job

 

   Spark 是迭代式计算,一个阶段结束后,后面可以有多个阶段,直至任务计算完      成,也就意味着我们可以做很多的操作,这就是Spark计算模型比mr 强大的地方

 

三、什么是Spark RDD

 

1、什么是RDD

弹性的,分布式的,数据集

 

RDD在逻辑上可以看出来是代表一个HDFS上的文件,他分为多个分区,散落 Spark的多个节点上)

3、RDD----弹性

RDD的某个分区的数据保存到某个节点上,当这个节点的内存有限,保存不了这个 分区的全部数据时,Spark就会有选择性的将部分数据保存到硬盘上,例如:当worker 的内存只能保存20w条数据时,但是RDD的这个分区有30w条数据,这时候Spark 会将多余的10w条数据,保存到硬盘上去。Spark的这种有选择性的在内存和硬盘之间的权衡机制就是RDD的弹性特点所在

 

4、Spark的容错性

RDD最重要的特性就是,提供了容错性,可以自动的从失败的节点上恢复过来,即如 果某个节点上的RDD partition(数据),因为节点的故障丢了,那么RDD会自动的通过 自己的数据来源重新计算该partition,这一切对使用者来说是透明的


2、Spark的开发类型

 

   1)、核心开发:离线批处理 演示性的交互式数据处理

        

       2)、SQL查询:底层都是RDD和计算操作

 

       3)、底层都是RDD和计算操作

 

       4)、机器学习

 

       5)、图计算

 

3Spark 核心开发(Spark-core == Spark-RDD)步骤

 

   1)、创建初始的RDD

 

   2)、对初始的RDD进行转换操作形成新的RDD,然后对新的RDD再进行操作,直 至操作计算完成

 

3)、将最后的RDD的数据保存到某种介质中(hivehdfsMySQLhbase...

 

五、Spark原理

DriverMasterWorkerExecutorTask各个节点之间的联系

 

 

Spark中的各节点的作用:

1、driver的作用:

    1)、 向master进行任务的注册

(2)、构建运行任务的基本环境

(3)、接受该任务的executor的反向注册

(4)、向属于该任务的executor分配任务

 

2、什么是driver

   我们编写的程序打成jar包后,然后找一台能够连接spark集群的节点做任务的driver,具体的表现为SparkSubmit

 

3、Master的作用:

   1)、监控集群;

   2)、动态感知worker的上下线;

   3)、接受driver端注册请求;

   4)、任务资源的调度

 

4、Worker的作用:

   1)、定时向master汇报状态;

   2)、接受master资源调度命令,进行资源的调度

   3)、启动任务的容器Executor

 

5、Executor的作用:

   1)、保存计算的RDD分区数据;

   2)、向Driver反向注册;

   3)、接受Driver端发送来的任务Task,作用在RDD上进行执行

 

 

Spark 编程的流程:

 

1、我们编写的程序打包成jar包,然后调用Spark-Submit 脚本做任务的提交

 

2、启动driver做任务的初始化

 

3、Driver会将任务极其参数(corememorydriver相关的参数)进行封装成ApplicationDescript通过taskSchedulerImpl 提交给Master

 

4、Master接受到driver端注册任务请求时,会将请求参数进行解析,并封装成APP,然后进行持久化,并且加入到其任务队列中的waitingAPPs

 

5、当轮到咱们提交的任务运行时,master会调用schedule()这个方法,做任务资源调度

 

6、Master将调度好的资源封装成launchExecutor,发送给指定的worker

 

7、Worker接收到发送来的launchExecutor时,会将其解析并封装成ExecutorRunner,然后调用start方法,启动Executor

 

8、Executor启动后,会向任务的Driver进行反向注册

 

9、当属于这个任务的所有executor启动成功并反向注册完之后,driver会结束SparkContext对象的初始化

 

10、sc 初始化成功后,意味着运行任务的基本环境已经准备好了,driver会继续运行我们编写好的代码

 

11、开始注册初始的RDD,并且不断的进行转换操作,当触发了一个action算子时,意味着触发了一个job,此时driver就会将RDD之间的依赖关系划分成一个一个的stage,并将stage封装成taskset,然后将taskset中的每个task进行序列化,封装成launchtask,发送给指定的executor执行

 

12、Executor接受到driver发送过来的任务task,会对task进行反序列化,然后将对应的算子(flatmapmapreduceByKey。。。。)作用在RDD分区上

 

 

六、RDD详解

 

 1、什么是RDD

RDDResilient Disttibuted Dataset)叫做弹性的分布式的数据集,是spark中最基本的数据抽象,它代表一个不可变,可分区,里面的元素可并行计算的集合

 

 2RDD的特点:

自动容错

位置感知性调度

伸缩性

 

 3RDD的属性:

(1)、一组分片(partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度,用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值,默认值就是程序所分配到的CPU Core的数目

(2)、一个计算每个分区的函数。SparkRDD的计算是以分片为单位的,每个RDD都会实现computer函数以达到这个目的。Computer函数会对迭代器进行复合,不需要保存每次计算的结果。

(3)、RDD之间的依赖关系RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

(4)、一个partition,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于hashPartitioner,另外一个是基于范围的RangePartitioner。只有对于key-valueRDD,才会有Partitioner,非key-valueRDDPartitioner的值是NonePartitioner函数不但决定了RDD本身的分片数量,也决定了partition RDD Shuffle输出时的分片数量。

(5)、一个列表,存储存取每个Partition的优先位置(preferred location。对于一个HDFD文件来说。这个列表保存的就是每个Partition所在的快的位置。按照“移动数据不如移动计算”的理念。Spark在进行任务调度的时候,会尽可能的将计算任务分配到所要处理数据块的存储位置。

 

 

4、RDD的创建:

   进行Spark核心编程时,首先要做的事就是创建一个初始的RDDSpark Core提供了三种创建RDD的方式:

(1)、使用程序中的集合创建RDD (调用parallelize()方法)

(2)、使用本地文件创建RDD  (调用textFile()方法)

(3)、使用HDFD文件创建RDD  (调用textFile()方法)

 

七、算子

   

   1、什么是算子?

RDD中定义的作用在每一个RDD分片上的函数,可以对RDD中的数据进行转换 和操作

 

   2RDD算子的分类

(1)Transformation算子,这类算子变换不触发提交作业(特点就是lazy特性)

返回的是一个RDD

(2)Action算子,这类算子会触发SparkContext提交作业(触发一个spark job的运行,从而触发这个action之前所有的transformation的执行)

返回的是一个spark对象

   

   3、常用的Transformation算子

八、RDD分区排序

 

  I、分区

两种实现方式:coalesce  和 repartition(底层调用coalesce

 

coalesce(numPartitons,isShuffle)

第一个参数是重分区后的数量,第二个参数是是否进行shuffle

如果原来有N个分区,重分区后有M个分区

如果 M > N ,必须将第二参数设置为true(也就是进行shuffle,等价于 repartition(numPartitons)    如果是false将不起作用  

如果M < N

100-->10 重分区后的分区数比原来的小的多,那么久需要使用shuffle,也即是设置为true

100-->90 重分区后的分区数和原来的差不多的,那么就不需要使用shuffle,也就是设置为false

 

II、排序

sortBy(x => x)  这个算子中带有隐式转换参数

 

能够排序(比较大小),那么这个类就必须有比较大小的功能,也就是实现了compareTo 或者compare

 

实现二次排序有两种方法:

1、继承Comparable 接口 或者 Ordered

2、隐式转换:可以定义隐式转换函数(Ordered)或者隐式转换值(Ordering

 

九、自定义分区

 

自定义分区

要求:按照key将对应的value输出到指定的分区中

解释:自定义一个自定义分区类,继承partitioner,实现他的两个方法

      1numPartitions

      2getPartition

具体的功能根据项目的要求自定义实现,然后调用partitionBy方法,new出自定义的类,传入参数即可

九、RDD持久化原理

   

1、持久化场景:对于一个rdd会被多次引用到,并且这个rdd计算过程复杂,计算时间特变耗时

 

2、如何进行持久化,调用rdd.persist方法或cache方法,cache方法底层就是调用persist方法

 

******************persist(StorageLevel.MEMORY_ONLY)*******************

如果对RDD做持久化,默认持久化级别是storageLevel.MEMORY_ONLY ,也就是持久化到内存中去,这种持久化级别是效率最快的,但是由于是纯Java 对象,保存到内存中,那么内存可能保存的数量就会较少

***************persist(StorageLevel.MEMORY_ONLY_SER)****************

如果当我们集群资源有限时,那么我们可以采用MEMORY_ONLY_SER,也就是将Java对象进行序列化之后持久到内存中去,这种持久化的好处是能够持久化更多的数据到内存中,但是由于在持久化时需要序列化,取出来之后又需要反序列化这一过程,这个过程会消耗CPU计算资源,性能相对于MEMORY_ONLY 这种持久化级别来说稍微弱点,但是还是比较高效的

 

3、如何选择RDD持久化策略?

Spark提供的多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍,下面是一些通用的持久化级别的选择建议:

  1)、优先使用MEMORY_ONLY,如果可以缓存所有数据的话,那么就使用这种策略,因为纯内存速度最快,而且没有序列化,不需要消耗CPU进行反序列化操作

  2)、如果MEMORY_ONLY策略,无法存储所有数据的话,那么使用MEMORY_ONLY_SER,将数据进行序列化存储,纯内存操作还是非常快的,只是要消耗CPU进行反序列化

  3)、如果需要进行快速的失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不需要重新计算了

  4、能不使用DISK相关的策略,就不要使用,有的时候,从磁盘读取数据,还不如重新计算一次

 

 

十一、共享变量

 

1、共享变量分为两种:广播变量     累加器

 

广播变量(broadcast

 

2、日常所遇问题

  因为每个task都需要拷贝这样的一个副本到executor去执行,那么我们可以想象一下,如果有1000 task在某个worker上执行,而这个副本有100M,那么意味着我们需要拷贝100G的数据都到某个worker上执行,这样的话会大大消耗我们的网络流量,同时会加大executor的内存消耗,从而增加了我们spark作业的运行时间,大大降低了spark作业的运行效率,增加了作业失败的概率

 

3、如何解决以上问题,也就是说什么时候使用广播变量?

  RDD引用到了一个外部变量并且这个外部变量数据量不小,同时这个RDD对应的task数量特别多,那么此时使用广播共享变量再合适不过了

  我们可以将这种大的外部变量做成广播变量,外部变量做成广播变量的时候,那么每个executor的内存中只会有一个外部变量而这个副本针对所有的task都是共享的,这样的话就减少了网络流量消耗,降低了executor的内存消耗,提高了spark作业运行效率和缩短了运行时间,同时降低了作业失败的概率

 

 

4、广播变量的使用流程:

   1)、某个executor的第一个task先执行,首先会从自己的blockManager中查找外部变量,如果没有就从邻居的executorblockManager的内存中获取这个外部变量,如果还是获取不到,就从driver端获取,拷贝这个外部变量到本地的executorblockManager

   2)、当这个executor的其他task执行时,就不需要从外面获取这个外部变量的副本,直接从本地的blockManager中获取即可

 

 

5、如何获取广播变量的值?

   可以直接调用广播变量的value() 这个方法即可

 

【注意】广播变量是只读的,不可写

 

 

累加器(Accumulator

 

Spark提供的Accumulator ,主要用于多个节点对一个变量进行共享性的操作,Accumulator只是提供了累加的功能。但是却给我们提供了多个task对一个变量并行操作的功能,但是task只能对Accumulator进行累加操作

【注意】task只能对Accumulator进行类加操作,只有Driver程序可以读取Accumulator的值

RDD分区和容错机制讲解

1、RDD Lineage血统

   RDD只支持粗粒度转换,即在大量记录上执行的单个操作,将创建RDD的一系列Lineage(血统)记录下来。以便恢复丢失的分区

 

2、RDD的依赖关系

   RDD和它的父RDD的关系有两种不同的类型:

1)、窄依赖(一对一,多对一)

形象的比喻:独生子女

2)、宽依赖(多对多)

形象的比喻:超生

 

注释:划分stage的依据就是宽依赖,也就是RDD之间是否有shuffleshuffle过程就是一个宽依赖过程,shuffle之前的tasks就属于一个stageshuffle之后的也属于一个stageshuffle之前和之后的操作都是窄依赖

【注意】shuffle过程分为:shuffle Write过程 和 shuffle read过程

 

4、DAG的生成(有向无环图)和任务的划分

   DAGDirected Acyclic Graph)叫做有向无环图(有方向无循环的图)

 

5、一个wordCount过程会产生多少个RDD

   至少会产生五个RDD

第一个,从HDFS中加载后得到一个RDD(即使用sc.textFile()算子),即HadoopRDD

  sc.textFile()过程中还会产生一个RDD(调用map算子),产生一个MapPartitionRDD

第二个,使用flatMap算子,得到一个MapPartitionRDD

第三个,使用map算子,得到一个MapPartitionRDD

第四个,使用reduceByKey算子,也就是在经过了shuffle过程后又会得到一个shuffledRDD

第五个,使用saveAsTextFile算子,再产生一个MapPartitionRDD 


spark程序提交流程讲解

Spark任务简介:

   Spark-submit--->SparkSubmit-->main-->submit-->doRunMain-->RunMain-->通过反射创建我们编写的主类的实例对象,调用main方法-->开始执行我们编写的代码-->初始化SparkContext对象-->创建初始的RDD-->触发action算子-->提交job-->worker执行任务-->任务结束

 

Spark任务详解: 

  1)、将我们编写的程序打成jar

 

  2)、调用spark-submit脚本提交任务到集群上运行

 

  3)、运行sparkSubmitmain方法,在这个方法中通过反射的方式创建我们编写的主类的实例对象,然后调用main方法,开始执行我们的代码(注意,我们的spark程序中的driver就运行在sparkSubmit进程中)

 

  4)、当代码运行到创建SparkContext对象时,那就开始初始化SparkContext对象了

 

  5)、在初始化SparkContext对象的时候,会创建两个特别重要的对象,分别是:DAGScheduler

TaskScheduler

 

DAGScheduler的作用】将RDD的依赖切分成一个一个的stage,然后将stage作为taskSet提交给DriverActor

 

  6)、在构建taskScheduler的同时,会创建两个非常重要的对象,分别是DriverActorClientActor

 

clientActor的作用】向master注册用户提交的任务

DriverActor的作用】接受executor的反向注册,将任务提交给executor

 

  7)、clientActor启动后,会将用户提交的任务和相关的参数封装到ApplicationDescription对象中,然后提交给master进行任务的注册

 

  8)、master接受到clientActor提交的任务请求时,会将请求参数进行解析,并封装成Application,然后将其持久化,然后将其加入到任务队列waitingApps

 

  9)、当轮到我们提交的任务运行时,就开始调用schedule(),进行任务资源的调度

 

  10)、master将调度好的资源封装到launchExecutor中发送给指定的worker

 

  11)、worker接受到Maseter发送来的launchExecutor时,会将其解压并封装到ExecutorRunner中,然后调用这个对象的start(), 启动Executor

 

  12)、Executor启动后会向DriverActor进行反向注册

 

  13)、driverActor会发送注册成功的消息给Executor

 

  14)、Executor接受到DriverActor注册成功的消息后会创建一个线程池,用于执行DriverActor发送过来的task任务

 

  15)、当属于这个任务的所有的Executor启动并反向注册成功后,就意味着运行这个任务的环境已经准备好了,driver会结束SparkContext对象的初始化,也就意味着new SparkContext这句代码运行完成

 

  16)、当初始化sc成功后,driver端就会继续运行我们编写的代码,然后开始创建初始的RDD,然后进行一系列转换操作,当遇到一个action算子时,也就意味着触发了一个job

 

  17)、driver会将这个job提交给DAGScheduler

 

  18)、DAGScheduler将接受到的job,从最后一个算子向前推导,将DAG依据宽依赖划分成一个一个的stage,然后将stage封装成taskSet,并将taskSet中的task提交给DriverActor

 

  19)、DriverActor接受到DAGScheduler发送过来的task,会拿到一个序列化器,对task进行序列化,然后将序列化好的task封装到launchTask中,然后将launchTask发送给指定的Executor

 

  20)、Executor接受到了DriverActor发送过来的launchTask时,会拿到一个反序列化器,对launchTask进行反序列化,封装到TaskRunner中,然后从Executor这个线程池中获取一个线程,将反序列化好的任务中的算子作用在RDD对应的分区上

 

【注意】

Spark的任务分为为两种:

  ashuffleMapTaskshuffle之前的任务

  bresultTaskshuffle之后的任务

 

Spark任务的本质:

  RDD的依赖关系切分成一个一个的stage,然后将stage作为TaskSet分批次的发送到Executor上执行

 

 

十三、Checkpoint

  

 1、使用checkpoint的场景:

     某个RDD会被多次引用,计算特别复杂,计算特别耗时

     担心中间某些关键的,在后面会反复几次使用的RDD,可能会因为节点的故障,导致持久化数据的丢失

 

 2、如何对RDD进行checkpoint

1)、设置还原点目录,设置checkpoint目录

2)、调用RDDcheckpoint的方法对该RDD进行checkpoint

 

 3checkpoint的原理

1)RDD调用了checkpoint方法之后,就接受RDDCheckpointData对象的管理

2)RDDCheckpointData对象会负责将调用了checkpointRDD 的状态设置为MarkedForCheckpoint

3)、当这个RDD所在的job运行结束后,会调用最后一个RDDdoCheckpoint,根据其血统向上查找,查找到被标注为MarkedForCheckpoint状态的RDD,将其状态改变为checkpointingInProgress

4)、启动一个单独的job,将血统中标记为checkpointingInProgressRDD进行checkpoint,也就是将RDD的数据写入到checkpoint的目录中去

5)、当某个节点发生故障,导致包括持久化的数据全部丢失,此时会从还原点目录还原RDD的每个分区的数据,这样就不需要从头开始计算一次

 

4、checkpoint需要注意的地方

因为RDD在做checkpoint的时候,会单独启动一个job对需要进行checkpointRDD进行重新计算,这样就会增加spark作业运行时间,所以spark强烈建议在做checkpoint之前,应该对需要进行checkpointRDD进行持久化(即调用 .cache

 

5、checkpoint 和持久化的区别

1)、是否改变血统:

   持久化(.cache):不会改变RDD的依赖关系,也就是不会改变其血统

   Checkpoint:会改变RDD的血统,做了checkpointRDD会清除其所有的依赖关系,并将其父RDD强制设置为checkpointRDD,并且将RDD的状态更改为checkpointed

 

2)RDD的数据的可靠性:

   持久化:只是将RDD的数据持久化到内存或磁盘中,但是如果节点发生故障,那么持久化的数据还是会丢失

   Checkpointcheckpoint的数据保存在第三方高可靠的分布式的文件系统中,机试节点发生故障,数据也不会丢失,所以checkpoint比持久化可靠性更高

 

 

6、后续

   我们实现了checkpoint 之后,在某个task 又调用了该RDDiterator() 方法时,就实现了高容错机制,即使RDD的持久化数据丢失,或者压根儿就没有持久化,但是还是可以通过readCheckpointOrComputer() 方法,优先从父RDD-----checkpointRDD中读取,HDFS(外部文件系统)的数据

 

 

 

 

 

 

 

 

第二部分  spark-sql

 

一、Spark-SQL前世今生

 

  1Spark SQL的特点

1)、支持多种数据源:HiveRDDParquetJSONJDBC等。

2)、多种性能优化技术:in-memory columnar storagebyte-code generationcost model动态评估等。

3)、组件扩展性:对于SQL的语法解析器、分析器以及优化器,用户都可以自己重新开发,并且动态扩展

 

  2Spark SQL的性能优化技术简介

1)、内存列存储(in-memory columnar storage

2)、字节码生成技术(byte-code generation

3)、Scala代码编写的优化

 

 

  3Spark SQL and DataFrame

Spark SQLSpark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是DataFrame。同时Spark SQL还可以作为分布式的SQL查询引擎。Spark SQL最重要的功能之一,就是从Hive中查询数据。

 

DataFrame,可以理解为是,以列的形式组织的,分布式的数据集合。它其实和关系型数据库中的表非常类似,但是底层做了很多的优化。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD

 

 

二、Spark-sql的使用

 

  1RDD转换为DataFrame(两种)

1)、使用反射的方式来推断包含了特定数据类型的RDD的元数据

2)、通过编程接口来创建DataFrame

 

  2UDF自定义函数和UDAF自定义聚合函数

UDF,其实更多的是针对单行输入,返回一个输出

UDAF,则可以针对多行输入,进行聚合计算,返回一个输出,功能更加强大

 

  3Spark-SQL工作原理

SqlParse  --------->  解析器

 

Analyser  --------->  分析器

 

Optimizer  --------->   优化器

 

SparkPlan  --------->   物理计划


流程:

 

1)、自己编写的SQL语句

大家要知道,只要在数据库类型的技术里面,比如:最传统的MySQLOracle等,包括现在大数据领域的数据仓库,比如hive,他的基本的SQL执行的模型,都是类似的,首先都要生成一条SQL语句的执行计划

 

2)、通过SqlParser(解析器)生成未解析的逻辑计划(unresolved LogicalPlan

3)、通过Analyzer(分析器)生成解析后的逻辑计划(resolved LogicalPlan

4)、通过Optimizer(优化器)生成优化后的逻辑计划(optimized LogicalPlan

实际上,比如传统的Oracle等数据库,通常都会生成多个执行计划,然后呢,最后有一个优化器,针对多个计划,选择一个最好的计划,而SparkSql这儿的优化指的是,比如说,刚生成的执行计划中,有些地方的性能是显而易见的,不太好,举例说明:

比如说,我们有一个SQL语句,select name from (select ... from ...) where ..=..;

此时,在执行计划解析出来的时候,其实就是按照他原封不动的样子,来解析成可以执行的计划,但是呢,Optimizer 在这里其实就会对执行计划进行优化,比如说,发现where 条件,其实可以放在子查询中,这样,子查询的数量大大变小,可以优化执行速度,此时,可能就会变成如下这样:select name from (select name from ...where ..=..)

 

5)、通过SparkPlan,生成最后的物理计划(PhysicalPlan

到物理计划这里,那么其实就是非常“接地气”的计划了。就是说,已经很明朗了,从那几个文件读取什么数据,从那几个文件中读取,如何进行关联等等

 

6)、executor中执行物理计划

逻辑的执行计划,更多的是偏向于逻辑,比如说吧,大致就是这种样子的,

From table students=>filter ... => select name ...

这里基本上,逻辑计划都是采用Tree ,树形结构

 

7)、生成RDD

Select  name  from  students => 解析,从哪里去查询,students表,在哪个文件里,从哪个文件中查询哪些数据,比如说是name这个列,此外,复杂的SQL,还有,比如说查询时,是否对表中的数据进行过滤和筛选,更不用说,复杂时,需要有多表的JOIN(咋传统数据库中,比如MySQL,执行计划还涉及到如何扫描和利用索引)

 

 

 

  4spark-SQL性能优化

 

1)、设置shuffle过程的并行度spark.sql.shuffle.partitionsSQLContext.setConf()

 

2)、在hive数据仓库建设过程中,合理设置数据类型,比如能设置为int的,就不要设置为bigInt,减少数据类型导致不必要的内存开销

 

3)、编写SQL时,尽量给出明确的列名,比如select name from students。不要写select * 的方式。

 

4)、并行处理查询结果:对于spark-SQL查询的结果,如果数据量比较大,比如超过1000条,那么就不要一次性的collect()到driver再处理,使用foreach()算子,并行处理查询结果

5)、缓存表:对于一条SQL语句可能对此使用到的表,可以对其进行缓存,使用 sqlContext.cacheTable(tableName),或者DataFrame.cache()即可,spark-SQL会用内存列存储的格式进行表的缓存,然后spark-sql就可以仅仅扫描需要使用的列,并且自动优化压缩,来最小化内存使用和GC开销,SQLContext.uncacheTable(tableName)可以将表从缓存中移除,用SQLContextsetConf(),设置spark.sql.inMemoryColumnarStorage.batchSize参数(默认10000),可以设置列存储的单位

6)、广播joinspark.sql.autoBroadcastJoinThreshold,默认10485760 (10 MB)。在内存够用的情况下,可以增加其大小,参数设置了一个表在join的时候,最大在多大以内,可以被广播出去优化性能

 

 5Hive on Spark配置

1)、安转配置好HiveSpark

2)、Set hive.execution.engine=spark;

3)、set spark.master=spark://mini1:7077

 

 

 

第三部分 spark-streaming

 

1,  Dstream

 

DstreamsparkStreaming的数据模型,本质就是一连串不间断的RDD,但是它是一个时间段的RDD.这些时间段的RDD源源不断的连接在一起。

这个时间可以自己设置,时间设置的越短,实时性越高,但是性能消耗也越大。

 

 

2,  spark streamingkafka获取数据,有哪几种方式?

 

有两种方式:

1.通过receiver的方式,

2,通过direct的方式,dirrect的方式需要自己来管理偏移量。

 

 

3,  sparkStreamingstorm的区别

 

sparkStreamingspark里面的一个做流式准实时计算的组件,它使用的数据结构是DstreamDstream里面是一连串时间片的rdd

相比于stormsparkStreaming在实时性,保证数据不丢失方面都不占用优势,spark streamingspark支持者眼中的优势是spark Streaming具有高吞吐性,最本质来说,sparkStreaming相比于storm的优势是sparkStreaming可以和spark corespark SQL无缝整合。

 

 

4.对于需要多次引用的,并且这个dstream计算时间特别耗时,数据特别重要,那么我们就需要对dstream进行checkpoint,(只有多次引用的,进行持久化就可以了),因为即使对这个dstream进行持久化,数据也可能会丢失,而checkpoint数据丢失的可能性小,但是这样会影响spark-streaming的数据吞吐量,因为在做计算的同时,还需要将数据写入到外部存储系统中,会降低spark性能,影响吞吐量,非必要情况下不建议使用

 

5.如何对dstreamcheckpoint

 

首先设置还原点目录,其次调用dstreamcheckpoint方法

【注意】:dstreamcheckpoint的周期一定要是产生batch时间的整数倍,同时spark官方建议将checkpoint的时间设置为至少10秒。通常来说,将checkpoint间隔设置为窗口操作的滑动间隔的5-10

 

 

6.spark程序在启动时,会去这个checkpointPath目录下查看是否有保存的driver的元数据(1.dstream的操作转换关系,2.未处理完的batch)信息,当spark-streaming程序在二次启动后就会去checkpointPath目录下还原这个程序,加载未处理的batch元数据信息在内存中恢复,继续进行任务处理

 

 

 

7.为了保证spark-streaming程序7*24小时运行,那么我们程序应该具备高可靠性,怎样具备高可靠性?

 

a.程序出现故障,driver死掉了,流式程序应该具备自动重启的功能

b.没有计算完成的rdd在程序异常停止后,下次启动后还会将未处理的rdd进行处理

【注意】:要在spark_submit中,添加--deploy-mode参数,默认其值为client,即在提交应用的机器上启动driver,但是要能够自动重启driver,就必须将其值设置为cluster;此外,需要添加--supervise参数,失败后自动重启

//spark_submit --executor-memory 1g --total-execute-cores 5 --deploy-model cluster --supervise

 

 

 

8.启用预写机制

a.预写日志机制,简写为WAL,全称为Write Ahead Log,从spark1.2版本开始,就引入了基于容错的文件系统的WAL机制。如果启用该机制,Receiver接收到的所有数据都会写入配置的checkpoint目录中的预写日志。这中机制可以让driver在恢复的时候,避免数据丢失,并且可以确保整个实时计算过程中零数据丢失

 

b.要配置该机制,首先调用StreamingContextcheckpoint()方法设置一个checkpoint目录,然后需要将spark.streaming.receiver.writeAheadLog.enable参数设置为true

然而,这种极强的可靠性机制,会导致Receiver的吞吐量大幅度下降,因为单位时间内,有相当一部分时间需要将数据写入预写日志。如果又希望开启预写日志机制,确保数据零损失,又不希望影响系统的吞吐量,那么可以创建多个输入DStream,启动多个Receiver

此外,在启用了预写日志机制之后,推荐将复制持久化机制禁用掉,因为所有数据已经保存在容错的文件系统中,不需要在用复制机制进行持久化,保存一份副本,只要将输入的DStream的持久化机制设置一下即可,persistStorageLevel.MEMORY_AND_DISK_SER)。

 

9.spark-Streaming checkpoint概述

每一个spark-streaming应用,正常来说,都是7*24小时运转的,这就是实时计算程序的特点,因为要持续不断的对数据进行计算,因此,对实时计算应用的要求,应该是必须要能够对与应用程序逻辑无关的失败,进行容错

如果要实现这个目标,Spark Streaming程序就必须将足够的信息checkpoint到容错的存储系统上,从而让它能够从失败中进行恢复

有两种数据需要被进行checkpoint

1.元数据checkpoint-将定义了流式计算逻辑的信息,保存到容错的存储系统上,比如HDFS,当运行spark Streaming应用程序的Driver进程所在的节点失败时,该信息可以用于进行恢复,元数据信息包括:

a.配置信息---创建spark Streaming应用程序的配置信息,比如sparkConf中的信息

b.DStream的操作信息---定义了spark Stream应用程序的计算逻辑的DStream操作信息

c.未处理的batch信息---那些job正在排队,还没处理的batch信息

 

2.数据checkpoint---将实时计算过程中产生的RDD的数据保存到可靠的存储系统中。对于一些将多个batch的数据进行聚合,有状态的transformation操作,这是非常有用的,在这种transformation操作中,生成的RDD是依赖于之前batchRDD的,这会导致随着时间的推移,RDD的依赖链条变得越来越长,要避免由于依赖链条越来越长,导致的一起变得越来越长的失败恢复时间,有状态的transformation操作执行过程中中间产生的RDD,会定期被checkpoint到可靠的存储系统上,比如HDFS,从而削减RDD的依赖链条进而缩短失败恢复时,RDD的恢复时间。一句话概括,元数据checkpoint主要是为了从driver失败中进行恢复;而RDD checkpoint主要是为了使用到有状态的transformation操作时,能够在其生产出的数据丢失时,进行快速的失败恢复

 

10.何时启用checkpoint机制?

 

a.使用了有状态的transformation操作----比如updateStateByKey,或者reduceByKeyAndWindow操作被使用了,那么checkpoint目录要求必须提供的,也就是必须开启checkpoint机制,从而进行周期性的RDD checkpoint

b.要保证可以从Driver失败中进行恢复-----元数据checkpoint需要启用,来进行这种情况的恢复

【注意】并不是说,所有的spark streaming应用程序,都要启用checkpoint机制,如果既不强制要求从Driver失败中自动进行恢复,又没使用有状态的transformation操作,那么就不需要启用checkpoint,事实上,这么做反而是有助于提升性能的

 

11.如何自动从Driver失败中恢复过来

 

要能够自动从Driver失败中恢复过来运行spark Streaming应用程序的集群,就必须监控Driver运行的过程,并且在他失败时将他重启,对于spark自身的standalone模式,需要进行一些配置去supervise driver,在他失败时将其重启

 

首先,要在spark-submit中,添加--deploy-mode参数,默认其值为client,即在提交应用的机器上启动Driver,但是,要能够自动重启Driver,就必须将其值设置为cluster,此外,需要添加--supervise参数

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/832611
推荐阅读
相关标签
  

闽ICP备14008679号