当前位置:   article > 正文

Spark入门简介_.groupby(x=>

.groupby(x=>

一、Spark简介
Spark最初由美国加州伯克利大学的AMP实验室于2009年开发,是基于内存计算的大数据并行计算框架,
可用于构建大型的、低延迟的数据分析应用程序。

Spark特点:
1.运行速度快:Spark使用先进的DAG(Directed Acyclic Graph,有向无环图)执行引擎,以支持循环数据流与内存计算,
基于内存的执行速度可比Hadoop MapReduce快上百倍,基于磁盘的执行速度也能快十倍;
2.容易使用:Spark支持使用Scala、Java、Python和R语言进行编程,简洁的API设计有助于用户轻松构建并行程序,
并且可以通过Spark Shell进行交互式编程;
3.通用性:Spark提供了完整而强大的技术栈,包括SQL查询、流式计算、机器学习和图算法组件,
这些组件可以无缝整合在同一个应用中,足以应对复杂的计算;
4.运行模式多样:Spark可运行于独立的集群模式中,或者运行于Hadoop中,也可运行于Amazon EC2等云环境中,
并且可以访问HDFS、Cassandra、HBase、Hive等多种数据源。

Spark vs Hadoop

HADOOP的局限SPARK的改进
抽象层次低,代码编写难以上手通过使用RDD的统一抽象,实现数据处理逻辑的代码非常简洁。
只提供了Map和Reduce两个操作,欠缺表达力RDD提供了许多转换和动作,实现了很多基本操作,比如Sort,Join等。
一个Job只有Map和Reduce两个节点,复杂的程序需要大量的Job来完成,且Job之间的依赖关系需要开发者自行管理一个Job可以包含多个RDD的转换操作,Spark在调度时会生成多个Stage,一个Stage可以包含多个Map操作,只需要Map操作所使用的的RDD分区保持不变
处理逻辑隐藏在代码细节中,缺少整体逻辑视图RDD的转换支持流式API,提供处理逻辑的整体视图
对迭代式数据处理性能查询,Reduce与下一步Map之间的中间结果只能放在HDFS文件系统中通过内存缓存数据,可大大提高迭代计算性能,内存不足时溢出到磁盘上。
时延高,只适用于批数据处理,对交互式和实时处理支持不够将流拆成小的Batch,提供Stream处理流数据

二、Spark生态介绍
Spark的生态系统主要包含了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX 等组件,
1.Spark Core:
Spark的基本功能,包括任务调度,内存管理,容错机制等;
内部定义了RDDs(弹性分布式数据集),提供了很多APIs来创建和操作这些RDDs。
Spark的核心模块专注于调度和管理虚拟机之上分布式计算任务的执行,集群中的计算资源则教给Cluster Manager角色来管理。
Cluster Manager可以为自带的Standalone、第三方Yarn和Mesos。
Cluster Manager一般采用Master-Slave结构,以Yarn为例,部署ResourceManager服务的节点为Mster,
负责集群中所有计算资源的统一管理和分配; 部署NodeManger服务的节点为Slave,负责当前节点创建一个或多个
具备计算能力的JVM实例,在Spark中,这些节点也叫作Worker。
另外还有一个Client节点的概念,是指用户提交Spark Application时所在的节点。

2.Spark SQL:
Spark SQL允许开发人员直接处理RDD,同时也可查询Hive、HBase等外部数据源。Spark SQL的一个
重要特点是其能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行查询,并进行更复杂的数据分析。
应用场景,企业中用来做报表统计
3.Spark Streaming:
Spark Streaming支持高吞吐量、可容错处理的实时流数据处理,其核心思路是将流式计算分解成
一系列短小的批处理作业。Spark Streaming支持多种数据输入源,如Kafka、Flume和TCP套接字等。
应用场景,企业中用来从kafka接收数据做实时统计。
4.Mlib
一个包含通用机器学习功能的包,Machine learning lib,包含分类,聚类,回归等,还包括模型评估,和数据导入。
Mlib提供的上面这些方法,都支持集群上的横向扩展。
应用场景,机器学习。
5.Graphx:
是处理图的库(例如,社交网络图),并进行图的并行计算。
像Spark Streaming,Spark SQL一样,继承了RDD API,它提供了各种图的操作和常用图的算法。
应用场景,图计算。

三、Spark基本概念
RDD:是弹性分布式数据集(Resilient Distributed Dataset)的简称,是Spark分发数据和计算的基础抽象。
DAG:是Directed Acyclic Graph(有向无环图)的简称,反映RDD之间的依赖关系;
Application:用户编写的Spark应用程序;
Executor:是运行在工作节点(Worker Node)上的一个进程,负责运行任务(TASK),并并且负责将数据存在内存或者磁盘上;
JOB:在一个Application中,以Action为划分边界的Spark批处理作业。前面提到,Spark采用惰性机制,对RDD的创建和转换并
不会立即执行,只有在遇到第一个Action时才会生成一个Job,然后统一调度执行。一个Job包含N个Transformation和1个Action。
从数据的创建开始,经过 Transformation, 结尾是 Action 操作.这些操作对应形成一个有向无环图(DAG),形成 DAG 的先决条件是最后的函数操作是一个Action。
Stage:一个Job中,以Shuffle为边界划分出的不同阶段。每个阶段包含一组可以被串行执行的窄依赖或宽依赖操作:
用户提交的计算任务是一个由RDD构成的DAG,如果RDD在转换的时候需要做Shuffle,那么这个Shuffle的过程就将这个DAG分为了不同的阶段(即Stage)。
由于Shuffle的存在,不同的Stage是不能并行计算的,因为后面Stage的计算需要前面Stage的Shuffle的结果。在对Job中的所有操作划分Stage时,一般会按照倒序进行,
即从Action开始,遇到窄依赖操作,则划分到同一个执行阶段,遇到宽依赖操作,则划分一个新的执行阶段,且新的阶段为之前阶段的parent,然后依次类推递归执行。
child Stage需要等待所有的parent Stage执行完之后才可以执行,这时Stage之间根据依赖关系构成了一个大粒度的DAG。
在一个Stage内,所有的操作以串行的Pipeline的方式,由一组Task完成计算。

Task:
对一个Stage之内的RDD进行串行操作的计算任务。每个Stage由一组并发的Task组成(即TaskSet),这些Task的执行逻辑完全相同,只是作用于不同的Partition。一个Stage的总Task的个数由Stage中最后的一个RDD的Partition的个数决定。

例子说明(JOB,STAG,Shuffle):
val arr=Array(“cat”,“dog”,“lion”,“monkey”,“mouse”)
//创建RDD
val rdd = sc.parallize(arr)
//map:“cat”->c,cat
val rdd1 = rdd.Map(x=>(x.charAt(0),x))
//按照相同的key分组统计和计算
val rdd2 = rdd1.groupBy(x=>x._1).Map(x=>x._1,x._2.toList.length)
val result = rdd2.collect()
//output:Array((d,1),(l,1),(m,2))

当你在解释器里一行行输入的时候,实际上 Spark 并不会立即执行函数,而是当你输入了val result = rdd2.collect()
的时候, Spark才会开始计算,从sc.parallize(arr) 到最后的collect,形成一个 Job。
shuffle是划分DAG中stage的标识,同时影响Spark执行速度的关键步骤。
//Map:“cat”->c,cat
val rdd1 = rdd.Map(x=>(x.charAt(0),x))
//按照相同的key分组统计和计算
val rdd2 = rdd1.groupBy(x=>x._1).Map(x=>x._1,x._2.toList.length)
第一个 Map 操作将 RDD 里的各个元素进行映射, RDD 的各个数据元素之间不存在依赖,可以在集群的各个内存中独立计算,也就是并行化,
第二个 groupby 之后的 Map 操作,为了计算相同 key 下的元素个数,需要把相同 key 的元素聚集到同一个 partition 下,
所以造成了数据在内存中的重新分布,即 shuffle 操作.shuffle 操作是 spark 中最耗时的操作,应尽量避免不必要的 shuffle.
运行到STAG的边界时,数据在父STAG中按照TASK写到磁盘中,而在子STAG中通过网络按照TASK读取数据。
这些操作会导致很重的网络以及磁盘I/O。所以STAGE的边界时非常占资源的,在编写SPARK程序的时候尽量避免。

Spark结构设计:
Spark运行架构包括集群资源管理器(Cluster Manager)、运行作业任务的工作节点(Worker Node)、每个应用的
任务控制节点(Driver)和每个工作节点上负责具体任务的执行进程(Executor)。
其中,集群资源管理器可以是Spark自带的资源管理器,也可以是YARN或Mesos等资源管理框架。
在这里插入图片描述

Spark各种概念之间的关系
在Spark中,一个应用(Application)由一个任务控制节点(Driver)和若干个作业(Job)构成,一个作业由多个阶段(Stage)构成,一个阶段由多个任务(Task)组成。当执行一个应用时,任务控制节点会向集群管理器(Cluster Manager)申请资源,启动Executor,并向Executor发送应用程序代码和文件,然后在Executor上执行任务,运行结束后,执行结果会返回给任务控制节点,或者写到HDFS或者其他数据库中。

Spark运行基本流程:

1.当一个Spark应用被提交时,首先需要为这个应用构建起基本的运行环境,即由任务控制节点(Driver)创建一个
SparkContext,由SparkContext负责和资源管理器(Cluster Manager)的通信以及进行资源的申请、任务的分配和监控等。SparkContext会向资源管理器注册并申请运行Executor的资源;
2.资源管理器为Executor分配资源,并启动Executor进程,Executor运行情况将随着"心跳"发送到资源管理器上;
3.SparkContext根据RDD的依赖关系构建DAG图,DAG图提交给DAG调度器(DAGScheduler)进行解析,将DAG图分解成多个"阶段"
(每个阶段都是一个任务集),并且计算出各个阶段之间的依赖关系,然后把一个个"任务集"提交给底层的任务调度器(TaskScheduler)进行处理;Executor向SparkContext申请任务,任务调度器将任务分发给Executor运行,同时,SparkContext将应用程序代码发放给Executor;
4.任务在Executor上运行,把执行结果反馈给任务调度器,然后反馈给DAG调度器,运行完毕后写入数据并释放所有资源。
在这里插入图片描述

Spark运行架构的特点
1.每个应用都有自己专属的Executor进程,并且该进程在应用运行期间一直驻留。Executor进程以多线程的方式运行任务,减少了多进程任务频繁的启动开销,使得任务执行变得非常高效和可靠
2.Spark运行过程与资源管理器无关,只要能够获取Executor进程并保持通信即可;
3.Executor上有一个BlockManager存储模块,类似于键值存储系统(把内存和磁盘共同作为存储设备),在处理迭代计算任务时,不需要把中间结果写入到HDFS等文件系统,而是直接放在这个存储系统上,后续有需要时就可以直接读取;在交互式查询场景下,也可以把表提前缓存到这个存储系统上,提高读写IO性能;
4.任务采用了数据本地性和推测执行等优化机制。数据本地性是尽量将计算移到数据所在的节点上进行,即"计算向数据靠拢",因为移动计算比移动数据所占的网络资源要少得多。而且,Spark采用了延时调度机制,可以在更大的程度上实现执行过程优化。
比如,拥有数据的节点当前正被其他的任务占用,那么,在这种情况下是否需要将数据移动到其他的空闲节点呢?答案是不一定。因为,如果经过预测发现当前节点结束当前任务的时间要比移动数据的时间还要少,那么,调度就会等待,直到当前节点可用。

四、RDDs介绍

1.Driver program:
Spark在执行每个Application的过程中会启动Drive和Executor两种JVM进程:
Driver进程中为主控进程,负责执行用户Application的main方法,提交JOB,并将JOB转化为Task,在各个Executor进程间协调Task的调度。
运行在Worker的Executor的进程负责执行Task,并把结果返回给Driver,同时为需要缓存的RDD提供存储功能。

2.SparkContext:
Spark最重要的API,用户逻辑与Spark集群的交互接口,它会与Cluster Master交互,包括向它申请计算资源。
在Shell中SparkContext自动创建好了,就是sc.

3.RDDSs
弹性分布式数据集,Spark对所处理的数据的基本抽象。
val lines = sc.textFile("/xxx.txt"),将xxx.txt创建为RDD中,lines就是RDD。
Spark中的计算可以简单抽象为对RDD的创建、转换和返回操作结果的过程:
3.1 创建:通过加载外部物理存储(HDFS)中的数据集,或Application中定义的对象集合(List)来创建。 RDD在创建后不可改变,只可以对其执行下面的两种操作。
3.2 转换(Trasnformation):对已有的RDD中的数据执行计算进行转换,而产生新的RDD,在这个过程中有时会 产生中间RDD。Spark对于Transformation采用惰性计算机制,遇到Transformation时并不会立即计算结果, 而是等遇到Action时一起执行。
3.3 行动(Action):对已有的RDD中的数据执行计算产生结果,将结果返回Driver程序或写入外部物理存储。在Action过程中同样有可能生成中间RDD。

4.分片
一个RDD在物理上被切分为多个Partition,即数据分区,这些Partition可以分布在不同的节点上。Partition是Spark计算任务的基本处理单位,决定了并行计算的颗粒度,而Partition中的每一条Record为基本处理对象。例如对某个RDD进行map操作,在具体执行时是由多个并行的Task对各自分区的每一条记录进行map映射。

5.RDDs的依赖
对RDD的Transformation或Action操作,让RDD产生了父子依赖关系(事实上,Transformation或Action操作生成的中间RDD也存在依赖关系),这种依赖分为宽依赖和窄依赖两种:
NarrowDependency (窄依赖)
parent RDD中的每个Partition最多被child RDD中的一个Partition使用。让RDD产生窄依赖的操作可以称为窄依赖操作,如map、union。
WideDependency(或ShuffleDependency,宽依赖)parent RDD中的每个Partition被child RDD中的多个Partition使用,这时会依据Record的key进行数据重组,这个过程即为Shuffle(洗牌)。让RDD产生宽依赖的操作可以称为宽依赖操作,如reduceByKey, groupByKey。

6.Shuffle
有一部分Transformation或Action会让RDD产生宽依赖,这样过程就像是将父RDD中所有分区的Record进行了"洗牌"(Shuffle),数据被打散重组,如属于Transformation操作的join,以及属于Action操作的reduce等,都会产生Shuffle。

7.RDDs的血统关系图:
Spark维护Rdds之间的依赖关系和创建关系,叫做血统关系图。Spark使用血统关系图来计算每个RDD的需求和恢复丢失的数据

8.延迟计算:(Lazy Evaluation)
Spark对RDDs的计算是,第一次使用action操作的时候,这种方式在处理大数据的时候特别有用,可以减少数据的传输。加载数据也是延迟计算,数据只有在必要的时候才会加载进去。

9.persist
通过RDD的persist方法,可以将RDD的分区数据持久化在内存或硬盘中,通过cache方法则是缓存到内存。这里的persist和cache是一样的机制,只不过cache是使用默认的MEMORY_ONLY的存储级别对RDD进行persist,故"缓存"也就是一种"持久化"。前面提到,只有触发了一个Action之后,Spark才会提交Job进行真正的计算。所以RDD只有经过一次Action之后,才能将RDD持久化,然后在Job间共享,即如果两个Job用到了相同的RDD,那么可以在第一个Job中对这个RDD进行缓存,在第二个Job中就避免了RDD的重新计算。持久化机制使需要访问重复数据的Application运行地更快,是能够提升Spark运算速度的一个重要功能。若没有缓存再次执行时会按照血系重新计算。
缓存的级别:
MEMORY_ONLY(内存空间占用高,CPU消耗低)
MEMORY_ONLY_SER(序列化后保存内存,内存空间占用低,CPU消耗高)
DISK_ONLY(内存空间占用低,磁盘占用高,CPU消耗高)
MEMORY_AND_DISK(内存放不下就放在磁盘上)
MEMORY_AND_DISK_SER(内存放不下就放在磁盘上,内存中数据序列化)
unpersist()
从缓存中移除

10.Checkpoint
调用RDD的checkpoint方法,可以将RDD保存到外部存储中,如硬盘或HDFS。Spark引入checkpoint机制,是因为持久化的RDD的数据有可能丢失或被替换,checkpoint可以在这时候发挥作用,避免重新计算。

11.RDD的运行原理

在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了。Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批Task,然后将这些Task分配到各个Executor进程中执行。Task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个Task处理的数据不同而已。一个stage的所有Task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个stage。下一个stage的Task的输入数据就是上一个stage输出的中间结果。
如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候,它的每个Task可能都会从上一个stage的Task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。
这个过程就是shuffle。当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个Task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。

五、RDDs开发语法

1.把一个存在的集合传给SparkContext的parallelize()方法。
val rdd = sc.parallelize(Array(1,2,2,4),4)
第一个参数:待并行化处理的集合
第二个参数:分区个数
rdd.count() --统计个数
rdd.foreach(println)
2 加载外部数据集
val rddText = sc.textFile(“helloSpark.txt”);

3.Scala的变量声明
在Scala创建变量时,必须使用val或者var
val,变量值不可修改,一旦分配不能重新指向别的值
var,分配后可以指向类型相同的值。

4.Scala的匿名函数和类型推断
lines.filter(line=>line.contains(“Hello”))
filter中定义一个匿名函数,过滤出只包含Hello的记录。
line的类型不需指定,能够推断出来。

5.wordCount示例程序
object WordCount{
def main(args: Array[String]){
val conf = new SparkConf().setAppName(“wordcount”)
val sc = new SparkContext(conf)
val input=sc.textFile(“helloSpark.txt”)
val lines=input.flatMap(line=>line.split(" "))
val count=lines.map(word=>(word,1)).reduceByKey(case(x,y)=>x+y)
val output=count.saveAsTextFile(“helloSparkResult”)
}
}

6.Transformation
转换,从之前的RDD构建一个新的RDD,像map()和filter()

逐元素Transformation
map():
map()接收函数,把函数应用到RDD的每一个元素中,返回新RDD。
示例:
val lines = sc.parallelize(Array(“hello”,“spark”,“hello”,“world”,"!"))
val lines2 = lines.map(word=>(word,1))–把每个元素增加,1
lines2.foreach(println) --(hello,1) (spark,1)…
filter():
filter接收函数,返回只满足filter()函数的元素的新RDD
val lines3 = lines.filter(word=>word.contains(“hello”)) --过滤出hello的元素
flatMap():
对每个输入元素,输出多个输出元素。
flat压扁的意思,将RDD中元素压扁后返回一个新的RDD。
val inputs =sc.textFile(“helloSpar.txt”)
val lines = inputs.flatMap(line=>line.split(" ")) --将空格分隔每个元素,并换行

7.集合运算
val rdd1 = sc.parallelize(Array(“coffe”,“coffe”,“panda”,“monkey”,“tea”))
val rdd2 = sc.parallelize(Array(“coffe”,“monkey”,“kitty”))
val rdd_distinct=rdd1.distinct() --去重
val rdd_union = rdd1.union(rdd2) --合集
val rdd_inter=rdd1.intersection(rdd2)–并集
val rdd_sub=rdd1.subtract(rdd2)–在rdd1中存在rdd2中不存在

8.RDD基本操作之Action
Action介绍:
在RDD上计算出来一个结果。
把结果返回给Driver program或保存在文件系统,比如count(),save
reduce()
接收一个函数,作用在RDD两个类型相同的元素上,返回新元素。
可以实现,RDD中元素的累加,计数,和其他类型的聚集操作
val rdd=sc.parallelize(Array(1,2,3,3))
val sum=rdd.reduce((x,y)=>x+y) --x,y代表相同的元素,进行累加
collect():
遍历整个RDD,向driver program返回RDD的内容
需要单击内存能够容纳下(因为数据要拷贝给driver,测试使用)
大数据的时候,使用saveAsTextFile() action等
take(n):
返回RDD的n个元素(同时尝试访问最少的partitions)
返回结果是无序的,测试使用。
top()
排序(根据RDD数据的比较器)
foreach()
计算rdd中每个元素,但不返回到本地。
可以配合println()友好的打印出数据
9.Key/Value对RDDs
创建KeyValue对RDDs:
使用map()函数,返回key/value对
val rdd=sc.textFile(“helloSpark.txt”);
val rdd2=rdd.map(line=>(line.split(" ")(0),line))–以空格分隔取第一个单词作为key
基于集合创建
val rdd3=sc.parallelize(Array((1,2),(3,4),(3,6)));

KeyValue对RDDs的Transformations(example:{(1,2),(3,4),(3,6)})
reduceByKey(func):
把相同key的结果进行整合,rdd.reduceByKey((x,y)=>x+y)
结果为{(1,2),(3,10)}

groupByKey():
把相同的key的values分组,rdd.groupByKey()
{(1,[2]),(3,[4,6])}

mapValues(func):
函数作用于pairRDD的每个元素,key不变。比如rdd.mapValues(x=>x+1)
结果:{(1,3),(3,5),(3,7)}
Keys():
仅返回Keys
values():
仅返回values
sortByKey():
按照key排序的RDD

10.combineByKey():
(createCombiner,mergeValue,mergeCombiners,partitioner)
最常用的基于key的集合函数,返回类型与输入类型可以不同
许多基于key的聚合函数都用到了它,像groupByKey()

combineByKey():
遍历partition中的元素,元素的key,要么之前见过,要么不是
如果是新元素,则使用我们提供的createCombiner()函数
如果是partition中已存在的key,就会使用mergeValue()函数
合计全部partition结果的时候,使用mergeCombiners()函数

例子:求平均值
val scores=sc.parallelize((“jake”,80),(“jake”,90),(“jake”,95),(“mike”,85),(“mike”,92),(“mike”,90))
–score代表value
val score2=scores.combineByKey(score=>(1,score),(c1:(Int,Double),newScore)=>(c1._1+1,c_1.2+newScore),(c1:(int,Double),c2:(int,Double)=>(c1._1+c2._1,c1._2+c2._2)))
–通过case when判断类型结构
val average=score2.map{case(name,(num,score))=>(name,score/num)}

参考以下:
https://blog.csdn.net/qq_17677907/article/details/88685705
Spark原理简述:https://zhuanlan.zhihu.com/p/34436165
视频教程:https://www.imooc.com/learn/814
Spark快速入门:https://ifeve.com/spark-quick-start/

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/832617
推荐阅读
相关标签
  

闽ICP备14008679号