赞
踩
本篇博文是小二由工作经验所得,纯属个人所思所感!!!
小二讲堂:https://blog.csdn.net/Mirror_w
1.spark简介:
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行计算框架,Spark拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。
伯克利数据分析堆栈
MapReduce:基于磁盘
Spark:基于内存
都是分布式计算框架,Spark基于内存,MR基于HDFS,sprk处理数据的能力一般是MR的十倍以上,Spark还有DAG有向无环图来切分任务执行的流程,这样对比可以看出Sark的高效了。
RDD处理数据:RDD是逻辑概念不存放数据的,里面的partition是稍微具体的里面放的是一个个的计算逻辑。例如:flatMap().map等。
HDFS上的每个block,首先是位于不同节点之上的,而通过sc.TestFile(“path”)获取到的每个block数据块的集合就是一个RDD,而每个RDD就是由一系列partition组成的,在这里每个partition会由每个job Task去执行,这样的并行执行粒度保证了高效,而且处理RDD后的结果数据是基于内存进行存放的。
还有就是函数都是作用在每个Partition上的即是基于每个split上的。
当RDD1中的Job Task执行的job作业执行完毕时,由RDD1对象的FlatMap方法对数据进行处理,在这里当然是进行切片了,同样的这里的RDD1里的partition分别对应着RDD2中的partitioin,将切分后的数据进行一些列的处理。
注意:底层读取文件时还是一个MR任务进行读取,RDD是一个逻辑概念。
补充:Spark的运行模式:
local:
Standalone:
Yarn:
Mesos:资源调度框架
每次有RDD上的函数进行数据的处理的结果当然会越来越“小”或者越来越“简练”这里可将RDD中的partition进行设置,进行减少,当然这里是视情况了,当数据集小了可以减少partition,因为partition的数量决定了task的数量。
注意:在这里每个RDD是逻辑上的概念,大数据核心思想“计算向数据移动”是不变得,这些RDD逻辑是由对应的task进行分步执行的。当RDD2数据丢失时,可以基于RDD1的一个算子(这里是flatmap()方法)生成RDD2,这样达到了RDD之间的依赖,这也是RDD的特性之一。
RDD源码中可以看出以上的特性:
RDD对应的Scala代码(WordCount):
利用数据加快加载,在其他的In-Memory类数据库或Cache类系统中也有实现,Spark主要就是采用血统lineage来实现分布式运行坏境下的数据容错(节点失效、数据失效)问题。RDD Lineage 被称为RDD运算图或者RDD依赖关系图。它是在RDD上执行transformations函数并创建逻辑执行计划(logical execution plan)的结果,是RDD的逻辑执行计划。相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据转换(Transformation)操作(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时,它可以通过Lineage找到丢失的父RDD的分区进行局部计算来恢复丢失的数据,这样可以节省资源提高运行效率。这种粗颗粒的数据模型,限制了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。
在这里将算子可以理解为函数,例如wordount代码中的flatMap()、map()函数。每个算子都会在对应的partition上进行执行,并且将数据进行处理后的结果交还给写一个算子。每个算子的执行最终是由Actor进行触发,RDD内开始的时候是没有数据的当开始执行的时候只是逻辑上的轮行,当最后有action进行触发的时候,所有的RDD才会进行数据的处理。即这里的RDD是懒加载的。
在上面的代码中,算子作用在RDD上的,当没有action算子的触发的时候,RDD中是没有数据的,只有当action算子进行触发的时候才会真正的执行代码,进行数据的加载
-transformations延迟执行针对RDD的操作
Transformationi类算子,也叫转换算子,懒执行,
Action算子:也叫行动算子,用于触发Transformation的算子的执行,一个spark application中有一个action就对应有一个job
持久化算子:
1)基本的Transformation算子.
filter
过滤符合条件的数据,true是保留,,false是过滤掉
-map
将RDD中的每一个数据项,通过map中的函数映射关系将进来的数据映射为一条新的元素。
特点:输入一条数据,输出一条数据
-flatMap
先进行map后进行flat,与map类似,每个输入项可以映射为0到多个与元素。
sample
随机抽样算子,根据传递的比例进行又放回和不放回的抽样
reduceByKey
将响应的key根据响应的逻辑进行处理
-sortByKey sortBy
作用在K,V格式的RDD上,对key进行升序或者降序的排序
2.伪代码分析:
lines=sc.textFile(“path”)
error=lines.filter(_.endwith(“error”))
mysql_error=error.filter(.contain(“mysql”)).count
http_errot=error.filter(.contain(“http”)).count
名词解释:Application:就是一段从程序入口到结束的代码即就是一个应用程序
在这里一个Application对应的是两个job。
补充:transformation算子
Ø join,leftOuterJoin,rightOuterJoin,fullOuterJoin
作用在K,V格式的RDD上。根据K进行连接,对(K,V)join(K,W)返回(K,(V,W))
¬ join后的分区数与父RDD分区数多的那一个相同。
Ø union
合并两个数据集。两个数据集的类型要一致。
¬ 返回新的RDD的分区数是合并RDD分区数的总和。
Ø intersection
取两个数据集的交集,返回新的RDD与父RDD分区多的一致
Ø subtract
取两个数据集的差集,结果RDD的分区数与subtract前面的RDD的分区数一致。
Ø mapPartitions
与map类似,遍历的单位是每个partition上的数据。
Ø distinct(map+reduceByKey+map)
Ø cogroup
当调用类型(K,V)和(K,W)的数据上时,返回一个数据集(K,(Iterable,Iterable)),子RDD的分区与父RDD多的一致。
action
Ø foreachPartition
遍历的数据是每个partition的数据。
Ø count
返回数据集中的元素数。会在结果计算完成后回收到Driver端。负责计算每个worker中有多少条数据,只是将数据结果返回。
Ø take(n)
返回一个包含数据集前n个元素的集合。例如take(1)就是获取第一条数据,相当于下面的first算子
Ø first
first=take(1),返回数据集中的第一个元素。
Ø foreach
循环遍历数据集中的每个元素,运行相应的逻辑。
Ø collect
将计算结果回收到Driver端,进行收集收据,将数据进行返回到Driver,如果数据过多会造成Driver的OOM(内存溢出)
持久化算子有三种,cache、persist、checkpoint,以上算子都可以将RDD进行持久化,持久化的一个单位就是prartition,cache和persist都是懒执行了,必须有一个action行动算子的触发才会执行,出就话算子不仅可以将RDD进行持久化到磁盘还可以将RDD之间切断他们之间的依赖关系。
是将RDD持久化到内存中。cache是懒执行的。默认是将数据持久化到内存
代码测试磁盘和内存的效率:
val conf = new SparkConf() conf.setMaster("local") conf.setAppName("cachetest") val sc = new SparkContext(conf) val lines: RDD[String] = sc.textFile("./data/words") //使用cache将数据加载到内存 lines.cache() //使用count行动算子 val startTimes1 = System.currentTimeMillis() lines.count() val endTimes1 = System.currentTimeMillis() println(s"磁盘: times=${endTimes1-startTimes1}") //再次使用count,这次将从内存中拿到的数据 val startTimes2 = System.currentTimeMillis() lines.count() val endTimes2 = System.currentTimeMillis() println(s"内存: times=${endTimes2-startTimes2}") 结果:磁盘: times=1365 内存: times=95
可以手动指定持久化的级别
cache()=persist()=persist(StorageLevel.MEMORY_ONLY)
由图可以看出Spark中默认的副本有1个
上图是持久化的级别:
_UseDisk:是否使用到磁盘
_useMemory:是否使用内存
_useOffHeap:是否使用堆外内存
_deserialized:是否不进行序列化
_replication:副本数
最常用的有:
MEMORY_ONLY:
val MEMORY_ONLY = new StorageLevel(使用磁盘, 使用内存, 不使用堆外内存, 不序列化)
val MEMORY_AND_DISK = new StorageLevel(使用磁盘, 使用内存, 不使用堆外内存, 序列化)
val MEMORY_ONLY_SER = new StorageLevel(不使用磁盘, 使用内存, 不适用堆外内存, 序列化)
val MEMORY_AND_DISK_SER = new StorageLevel(不使用, 使用内存, 不适用堆外内存, 序列化)
优先使用内存,内存不够再使用磁盘
1.cache和persist都是懒执行,必须有一个action类算子触发执行。
2.cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。
3.cache和persist算子后不能立即紧跟action算子。
4.cache和persist算子持久化的数据当applilcation执行完成之后会被清除。
错误:rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。
checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。checkpoint目录数据当application执行完之后不会被清除。
checkpoint 的执行原理:
1.当RDD的job执行完毕后,会从finalRDD从后往前回溯。
2.当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。
3.Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。
优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。
1.checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系,checkpoint中的数据当application执行完毕之后不会清除checkpoint目录中的数据。
2.当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记
3.Spark框架会启动一个新的job,重新对这个RDD进行计算,将数据放到HDFS上
优化:对RDD执行checkpoint,先对这个RDD进行了cache,这样新启动的job,只需要将内存中的数据拷贝到HDFS上。省去了重新计算这一个步骤。
Ø 提交命令
./spark-submit–master yarn --class org.apache.spark.examples.SparkPi …/lib/spark-examples-1.6.0-hadoop2.6.0.jar100
或者
./spark-submit–master yarn–client --class org.apache.spark.examples.SparkPi …/lib/spark-examples-1.6.0-hadoop2.6.0.jar100
或者
./spark-submit–master yarn–deploy-mode client --class org.apache.spark.examples.SparkPi …/lib/spark-examples-1.6.0-hadoop2.6.0.jar100
Ø 执行原理图解
Ø 执行流程
Ø 提交命令
./spark-submit–master yarn–deploy-mode cluster --class org.apache.spark.examples.SparkPi …/lib/spark-examples-1.6.0-hadoop2.6.0.jar100
或者
./spark-submit–master yarn-cluster–class org.apache.spark.examples.SparkPi …/lib/spark-examples-1.6.0-hadoop2.6.0.jar100
Ø 执行原理图解
Ø 执行流程
执行流程
1.client模式提交任务后,会在客户端启动Driver进程。
2.Driver会向Master申请启动Application启动的资源。
3.资源申请成功,Driver端将task发送到worker端执行。
4.worker将task执行结果返回到Driver端。
总结
client模式适用于测试调试程序。Driver进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。在Driver端可以看到task执行的情况。生产环境下不能使用client模式,是因为:假设要提交100个application到集群运行,Driver每次都会在client端启动,那么就会导致客户端100次网卡流量暴增的问题。
提交命令
./spark-submit --master spark://node1:7077 --deploy-mode cluster
--class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
执行原理图解
执行流程
1.cluster模式提交应用程序后,会向Master请求启动Driver.
2.Master接受请求,随机在集群一台节点启动Driver进程。
3.Driver启动后为当前的应用程序申请资源。
4.Driver端发送task到worker节点上执行。
5.worker将执行情况和执行结果返回给Driver端。
总结
Driver进程是在集群某一台Worker上启动的,在客户端是无法查看task的执行情况的。假设要提交100个application到集群运行,每次Driver会随机在集群中某一台Worker上启动,那么这100次网卡流量暴增的问题就散布在集群上。
总结Standalone两种方式提交任务,Driver与集群的通信包括:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。