当前位置:   article > 正文

大数据分析工程师入门8--Spark基础

dataframe的操作可以分为两种类型:[1] 转换操作和[2] 行动操作


本文为《大数据分析师入门课程》系列的第8篇,主要讲解大数据分析师必须了解的Spark基础知识,前7篇分别是JAVA基础、SCALA基础、SQL基础、SQL进阶、HIVE基础、HIVE进阶、HDFS&YARN基础

依照惯例,首先,我们就以下三个问题进行简单说明。

  • 为什么讲Spark?

  • 本文的主要目标是什么?

  • 本文的讲解思路是什么?

为什么讲Spark?

随着并行数据分析变得越来越流行,各行各业的使用者们迫切需要更好的数据分析工具,Spark 应运而生。作为MapReduce的继任者,Spark具备以下优势特性。

1.高效性

内存计算下,Spark 比 MapReduce 快100倍。Spark使用最先进的DAG调度程序、查询优化程序和物理执行引擎,实现批量和流式数据的高性能。

2.易用性

Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建多样的应用。

3.通用性

Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。

这些不同类型的处理都可以在同一个应用中无缝使用。这对于企业应用来说,就可使用一个平台来进行不同的工程实现,减少了人力开发和平台部署成本。

4.兼容性

Spark能够跟很多开源工程兼容使用。如Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且Spark可以读取多种数据源,如HDFS、HBase、MySQL等。

对于任何一家已经部署好Hadoop基础集群的企业来说,在不需要进行任何数据迁移和处理的情况下,就可以快速使用上Spark强大的数据处理和计算能力。

本文的主要目标是什么?

通过以上对Spark几大特性的介绍,可以看出Spark可支持开展多种大数据相关工作,比如大数据算法、大数据底层架构开发、数据分析等。

由于本文属于《大数据分析师入门课程》系列,故本文针对Spark的内容讲解主要是围绕数据分析工作中经常涉及到的核心概念和常用的操作等相关知识点。

学习完本文,你将会对如何使用Spark完成数据分析工作有一个更深入的理解。

本文的讲解思路是什么?

第1部分,主要讲解Spark相比MapReduce的优势。

第2部分,主要跟大家一起来看下Spark生态系统具体包括哪些组成部分。

第3部分,针对Spark的三种主要数据组织类型进行一一介绍、对其三者之间的异同点进行总结以及给出三者之间转化的方式。

第4部分,共享变量,包括广播变量和累加器。

第5部分,主要介绍Spark中使用YARN进行资源管理时,任务的提交流程和方式。

 

话不多说,让我们直接开始吧!

01

SPARK与MAPREDUCE比较

MapReduce作为Hadoop的两大重要组成部分之一,相信绝大部分从业者在大数据学习之初,都学习过这个分布式计算框架。

Spark继承了其分布式并行计算的优点,并改善了MapReduce的几个不足,具体表现为以下几个方面。

1.Spark可以把中间结果放在内存中,迭代运算效率高

MapReduce把中间计算结果放在磁盘上,这样势必会影响整体的运行速度,而Spark使用DAG分布式并行计算编程框架,缩短了中间处理环节,提高了处理效率。

2.Spark容错性高

Spark引进了弹性分布式数据集(Resilient Distribued Dataset,RDD)的概念,它是分布在一组节点中的只读对象集合,这些集合是弹性的,如果数据集一部分丢失,可以对它们进行重建。另外,RDD在计算时,可以通过CheckPoint来实现容错。

3.Spark更加易用

Spark提供了丰富的开箱即用的数据集操作算子,使处理和分析数据变得更加简单。

02

SPARK生态系统

前面对Spark的特性和优势介绍了很多,接下来让我们直接来看看Spark生态系统的全貌吧。

Spark已经发展成为包含众多子项目的大数据计算平台。 

伯克利(Spark的创造者,加州大学伯克利分校)将Spark的整个生态系统称为伯克利数据分析栈(BDAS)。

以SparkCore为核心,能够读取本地文件(如文本文件)、HDFS、Amazon S3、Hbase和Alluxio等数据源,利用Standalone、EC2、MESOS、YARN等资源调度管理,完成应用程序对数据的分析与处理,这些应用程序来自Spark的不同组件,包括Spark Streaming实时流处理应用、SparkSQL的即席查询、采样近似查询引擎BlinkDB的权衡查询、MLBase/Mllib的机器学习、GraphX的图处理和SparkR的数据计算等。

其中,大数据分析工作涉及到的基础知识点有SparkCore中的RDD、SparkSQL(系列课程的下一篇文章将会单独讲解SparkSQL)、如何进行本地运行测试、如何定义和使用广播变量和累计器、如何在YARN上运行等相关知识点。

这里首先简单介绍下SparkCore。

SparkCore是整个BDAS生态系统的核心组件,是一个分布式大数据处理框架,SparkCore提供了多种资源调度管理,通过内存计算、有向无环图(DAG)等机制保证分布式计算的快速,并引入了RDD的抽象保证了数据的高容错性。

 

接下来开始针对数据分析工作中涉及到的Spark相关知识点进行逐一讲解。

03

RDD&DataFrame&DataSet

Spark提供了三种数据集的抽象概念,RDD、DataFrame和DataSet,分别对应了同名的三个对象和三组不同的API,接下来我们来进行逐个讲解,并举例说明其通常的用法。

  • RDD

RDD全称Resilient Distributed Dataset,弹性分布式数据集,它是记录的只读分区集合,是Spark的基本数据结构,见名释义:

弹性,表现在两个方面,一是当计算过程中内存不足时可刷写到磁盘等外存上,可与外存做灵活的数据交换;二是RDD使用了一种“血统”的容错机制,在结构更新和丢失后可随时根据血统进行数据模型的重建;

分布式,可分布在多台机器上进行并行计算;

数据集,一组只读的、可分区的分布式数据集合,集合内包含了多个分区,分区依照特定规则将具有相同属性的数据记录放在一起,每个分区相当于一个数据集片段。

 

在实际数据分析工作中,怎么样创建一个RDD呢?

创建方式一般有以下两种:

1.使用程序中的集合创建RDD,RDD的数据源是程序中的集合,通过parallelize或者makeRDD将集合转化为 RDD。

  1. val num = Array(1,2,3,4,5)
  2. val rdd = sc.parallelize(num)
  3. // 或者
  4. val rdd = sc.makeRDD(num)

2.使用本地文件或HDFS创建RDD,RDD的数据源是本地文件系统或HDFS的数据,使用 textFile 方法创建RDD。

val rdd = sc.textFile("hdfs://hans/data_warehouse/test/data")

 

RDD支持两种类型的操作:转换(Transformation)和行动(Action)。

转换操作是从已经存在的数据集中创建一个新的数据集,而行动操作是在数据集上进行计算后返回结果到 Driver。

转换操作都具有 Lazy 特性,即 Spark 不会立刻进行实际的计算,只会记录执行的轨迹,只有触发行动操作的时候,它才会根据 DAG 图真正执行。

数据分析工作中常用的转换与行动操作如下图所示:

转换算子

含义

map(func)

返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

filter(func)

返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成

flatMap(func)

类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

mapPartitions(func)

类似于map,但独立地在RDD的每一个分片上运行

mapPartitionsWithIndex(func)

类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是

(Int, Interator[T]) => Iterator[U]

sample(withReplacement, fraction, seed)

根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子

union(otherDataset)

对源RDD和参数RDD求并集后返回一个新的RDD

interp(otherDataset)

对源RDD和参数RDD求交集后返回一个新的RDD

distinct([numTasks]))

对源RDD进行去重后返回一个新的RDD

groupByKey([numTasks])

在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD

reduceByKey(func, [numTasks])

在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置

aggregateByKey(zeroValue)(seqOp, combOp,  [numTasks])

先按分区聚合 再总的聚合   每次要跟初始值交流 例如:aggregateByKey(0)(_+_ , _+_)  对(K,V)的RDD进行操作

sortByKey([ascending], [numTasks])

在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

sortBy(func,[ascending], [numTasks])

与sortByKey类似,但是更灵活  第一个参数是根据什么排序,第二个是怎么排序,false表示倒序,第三个参数表示排序后分区数 ,默认与原RDD一样

join(otherDataset, [numTasks])

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD,相当于内连接(求交集)

cogroup(otherDataset, [numTasks])

在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD

cartesian(otherDataset)

两个RDD的笛卡尔积

pipe(command, [envVars])

调用外部程序

coalesce(numPartitions)   

重新分区,第一个参数是要分多少区,第二个参数是否shuffle,默认false

repartition(numPartitions)

重新分区,必须shuffle,参数表示是要分多少区

repartitionAndSortWithinPartitions(partitioner)

重新分区+排序,比先分区再排序效率高,对(K,V)的RDD进行操作

foldByKey(zeroValue)(seqOp)

该函数用于(K,V)做折叠,合并处理  ,与aggregate类似   第一个括号的参数应用于每个V值  第二括号函数是聚合例如:_+_

combineByKey

合并相同的key的值,如rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m:  Int, n: Int) => m + n)

partitionBy(partitioner)

对RDD进行分区,partitioner是分区器

cache

RDD缓存,可以避免重复计算从而减少时间,区别:cache内部调用了persist算子,cache默认就一个缓存级别MEMORY-ONLY,而persist则可以选择不同缓存级别

persist

Subtract(rdd)

返回前RDD元素不在后RDD的RDD

leftOuterJoin

leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。

rightOuterJoin

rightOuterJoin类似于SQL中的右外关联right outer join,返回结果以参数中的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可

subtractByKey

substractByKey和基本转换操作中的subtract类似只不过这里是针对K的,返回在主RDD中出现且不在otherRDD中出现的元素

 

动作算子

含义

reduce(func)

通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的

collect()

在驱动程序中,以数组的形式返回数据集的所有元素

count()

返回RDD的元素个数

first()

返回RDD的第一个元素(类似于take(1))

take(n)

返回一个由数据集的前n个元素组成的数组

takeSample(withReplacement,num, [seed])

返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子

saveAsTextFile(path)

将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

saveAsSequenceFile(path) 

将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以是HDFS或者其他Hadoop支持的文件系统

saveAsObjectFile(path) 

先将RDD转为数组,然后序列化,然后将结果变换为(null,byteWritable)

countByKey()

针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数

foreach(func)

在数据集的每一个元素上,运行函数func进行更新

aggregate

先对分区进行操作,再总体操作

最后通过一段代码示例来看看具体怎么使用这些算子,这里会演示部分算子的使用情况,读者朋友们可以动手实际操作一下,以加强对以上所有算子的学习和了解。

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. object SparkWordCountWithScala {
  4. def main(args: Array[String]): Unit = {
  5. val conf = new SparkConf()
  6. /**
  7. * 如果这个参数不设置,默认认为你运行的是集群模式
  8. * 如果设置成local代表运行的是local模式
  9. */
  10. conf.setMaster("local")
  11. //设置任务名
  12. conf.setAppName("WordCount")
  13. //创建SparkCore的程序入口
  14. val sc = new SparkContext(conf)
  15. //读取文件生成RDD
  16. val file: RDD[String] = sc.textFile("E:\\hello.txt")
  17. //把每一行数据按照,分割
  18. val word: RDD[String] = file.flatMap(_.split(","))
  19. //让每一个单词都出现一次
  20. val wordOne: RDD[(String, Int)] = word.map((_,1))
  21. //单词计数
  22. val wordCount: RDD[(String, Int)] = wordOne.reduceByKey(_+_)
  23. //按照单词出现的次数降序排序
  24. val sortRdd: RDD[(String, Int)] = wordCount.sortBy(tuple => tuple._2,false)
  25. //将最终的结果进行保存
  26. sortRdd.saveAsTextFile("E:\\result")
  27. sc.stop()
  28. }
  29. }

示例代码中,将运行模式设置为local模式,就可以在本地对代码进行调试。

  • DataFrame&DataSet

理解了RDD,DataFrame理解起来就比较容易了,DataFrame的思想来源于Python的pandas库,RDD是一个数据集,DataFrame在RDD的基础上加了Schema(描述数据的信息,可以认为是元数据,DataFrame曾经就有个名字叫SchemaRDD)。

假设RDD中的两行数据长这样:

那么DataFrame中的数据长这样:

从上面两个图可以看出,DataFrame比RDD多了一个表头信息(Schema),像一张表了。

DataFrame还配套了新的操作数据的方法,DataFrame API(如df.select())和SQL(select id, name from xx_table where ...),让我们通过代码示例来了解下DataFrame的常见操作。

  1. import org.apache.spark.sql.SparkSession
  2. val spark = SparkSession
  3. .builder()
  4. .appName("Spark SQL basicexample")
  5. .getOrCreate()
  6. //引入Spark的隐式类型转换,如将RDD转换成DataFrame
  7. import spark.implicits._
  8. val df = spark.read.json("/data/tmp/SparkSQL/people.json")
  9. df.show() //打印DataFrame的内容
  10. //+---+-------+
  11. //|age| name|
  12. //+---+-------+
  13. //| |Michael|
  14. //| 19| Andy|
  15. //| 30| Justin|
  16. //+---+-------+
  17. df.printSchema() //打印出DataFrame的表结构
  18. //root
  19. // |-- age: string (nullable = true)
  20. // |-- name: string (nullable = true)
  21. df.select("name").show()
  22. //等同于select name fromDataFrame的SQL语句
  23. df.select($"name",$"age" + 1).show()
  24. //等同于select name,age+1 from DataFrame的SQL语句
  25. //此处注意,如果对列进行操作,所有列名前都必须加上$符号
  26. df.filter($"age" > 21).show()
  27. //等同于select * fromDataFrame where age>21 的SQL语句
  28. df.groupBy("age").count().show()
  29. //等同于select age,count(age)from DataFrame group by age;
  30. //同时也可以直接写SQL进行DataFrame数据的分析
  31. df.createOrReplaceTempView("people")
  32. val sqlDF = spark.sql("selectage,count(age) from people group by age")
  33. sqlDF.show()

 

DataSet是DataFrame API的扩展。相较于RDD来说,DataSet提供了强类型支持,区别也是给RDD的每行数据加了类型约束。

假设RDD中的两行数据长这样:

那么DataSet中的数据长这样:

或者长这样(每行数据是个Object):

引入DataSet有两个重要原因:

1.对于分布式程序来讲,提交一次作业太费劲了(要编译、打包、上传、运行),如果提交到集群运行时才发现错误,实在是浪费时间,DataSet相比DataFrame来说,它提供了编译时类型检查。

示例如下:

代码中json文件中并没有score字段,但是用DataFrame能编译通过,运行时才报异常。

而使用DataSet实现,会在代码编写时就报错,出错提前到了编译之前。

2.引入DataSet另一个重要原因是RDD转换DataFrame后不可逆,但RDD转换DataSet是可逆的。

示例如下:

通过RDD创建DataFrame,再通过DataFrame转换成RDD,发现RDD的类型变成了Row类型。

通过RDD创建DataSet,再通过DataSet转换为RDD,发现RDD还是原始类型。

 

因为DataSet吸收了RDD和DataFrame的优点,实际工作使用中,所以可以像操作RDD和DataFrame一样来操作DataSet,看下边一个简单的例子。

  1. case class Person(name: String,age: Long)
  2. // 通过case class创建DataSet
  3. val caseClassDS = Seq(Person("Andy", 32)).toDS()
  4. caseClassDS.show()
  5. // +----+---+
  6. // |name|age|
  7. // +----+---+
  8. // |Andy| 32|
  9. // +----+---+
  10. // 通过基本类型创建DataSet
  11. import spark.implicits._
  12. val primitiveDS = Seq(1, 2, 3).toDS()
  13. primitiveDS.map(_ + 1).collect()
  14. // Returns: Array(2, 3, 4)
  15. // 将DataFrame转换成DataSet
  16. val path = "examples/src/main/resources/people.json"
  17. val peopleDS = spark.read.json(path).as[Person]
  18. peopleDS.show()
  19. // +----+-------+
  20. // | age| name|
  21. // +----+-------+
  22. // |null|Michael|
  23. // | 30| Andy|
  24. // | 19| Justin|
  25. // +----+-------+
  • RDD&DataFrame&DataSet异同点

共同点

1.RDD、DataFrame、DataSet全都是Spark平台下的分布式弹性数据集,为处理超大型数据提供便利。

2.三者都有惰性机制,在进行创建、转换等阶段,如map、filter等方法时,不会立即执行,只有在遇到Action如count、reduce时,才会真正开始运算。

3.三者都会根据Spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出。

4.三者有许多共同的函数,如filter、map等。

 

不同点

1.RDD不支持Sparksql操作,DataFrame与DataSet均支持Sparksql,比如select,groupby之类,还能注册临时表/视图,实现与sql语句的无缝操作。

2.DataSet和DataFrame拥有完全相同的成员函数,区别在于每一行的数据类型和字段类型是否明确。DataFrame也可以叫DataSet[Row],每一行的类型为Row,而DataSet每一行的数据类型是确定的。

DataFrame只知道字段,但无法确定字段的具体类型,所以在执行这些操作的时候是没办法在编译的时候检查类型是否匹配的,比如你可以对一个String进行减法操作,在执行的时候才会报错,而DataSet不仅仅知道字段,还知道字段类型,所以有更严格的错误检查。

3.相比于RDD,DataFrame与DataSet支持一些特别方便的保存方式,比如保存成csv,且可以带上表头,这样每一列的字段名一目了然。

  • RDD&DataFrame&DataSet相互转化

RDD、DataFrame、DataSet三者有许多共性,有各自适用的场景常常需要在三者之间转换。

DataFrame/DataSet转RDD

  1. val rdd1 = testDF.rdd //testDF表示一个DataFrame
  2. val rdd2 = testDS.rdd //testDS表示一个DataSet

RDD转DataFrame

  1. import spark.implicits._
  2. val testDF = rdd.map{line=>
  3. val words = line.split(",")
  4. (words(0),words(1))
  5. }.toDF("col1","col2")

RDD转DataSet

  1. import spark.implicits._
  2. case class Coltest(col1:String,col2:Int//定义字段名和类型
  3. val testDS = rdd.map {line=>
  4. Coltest(line._1,line._2)
  5. }.toDS

DataSet转DataFrame

  1. import spark.implicits._
  2. val testDF = testDS.toDF //testDS表示一个DataSet

DataFrame转DataSet

  1. import spark.implicits._
  2. case class Coltest(col1:String,col2:Int) //定义字段名和类型
  3. val testDS = testDF.as[Coltest]

04

共享变量

一般来说,跨任务的读写变量是低效的,但是,Spark还是为两种常见的使用模式提供了两种共享变量:广播变量(broadcast variable)和累加器(accumulator)。

  • 为什么要将变量定义成广播变量?

广播变量允许开发人员在每个节点缓存只读的变量,而不是在任务之间传递这些变量。

实际工作中,当我们需要在分布式计算里面分发大对象,例如:字典,集合,黑白名单等,这个都会由Driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task就会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的内存资源,如果将这个变量声明为广播变量,那么只是每个Executor拥有一份,这个Executor启动的task会共享这个变量,从而节省了通信的成本和内存资源。

  1. // 定义一个广播变量
  2. scala> val broadcastVar = sc.broadcast(Array(1,2, 3))
  3. broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
  4. // 访问广播变量的值
  5. scala> broadcastVar.value
  6. res0: Array[Int] = Array(1, 2, 3)

使用广播变量的注意事项:

1.广播变量只能在Driver端定义,不能在Executor端定义。

2.在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。

3.不能将一个RDD使用广播变量广播出去,因为RDD是不存储数据的。可以将RDD在Driver端collect为一个集合再广播出去。

  • 为什么要定义累加器?

在数据分析工作中,我们经常会有这样的需求,如异常监控,调试,记录符合某特性的数据的数目,这种需求都需要用到计数器,如果一个变量不被声明为一个累加器,那么它将在被改变时不会在Driver端进行全局汇总,即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值,但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能。

我们可以通过分别调用SparkContext.longAccumulator()或SparkContext.doubleAccumulator() 累积Long或Double类型的值来创建数字累加器。然后,可以使用add方法对累加器进行增加。驱动程序可以使用其value方法读取累加器的值。

  1. // 定义一个累加器
  2. scala> val accum = sc.longAccumulator("MyAccumulator")
  3. accum:org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(MyAccumulator), value: 0)
  4. // 通过add方法对累加器进行增加
  5. scala> sc.parallelize(Array(1, 2, 3,4)).foreach(x => accum.add(x))
  6. ...
  7. 10/09/29 18:41:08 INFO SparkContext: Tasks finishedin 0.317106 s
  8. // 计算完成后获取累加器的最终值
  9. scala> accum.value
  10. res2: Long = 10

使用累加器的注意事项:

1.累加器在Driver端定义赋初始值,且只能在Driver端读取最后的值,在Excutor端更新。

2.在Driver端获取累计器值的时候需要一个Action操作来触发,才能拿到值。

3.累计器只能执行add操作。

05

SPARK中YARN模式提交任务方式

Spark可以和YARN整合,将Application提交到YARN上运行,YARN运行模式根据Driver在集群中的位置分为两种模式:一种是YARN-Client模式,另一种是YARN-Cluster。

YARN-Client提交任务方式如下:

  1. ./spark-submit --master yarn-client --class org.apache.spark.examples.SparkPi../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
  2. 或者
  3. ./spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100

YARN-Cluster提交任务方式如下:

  1. ./spark-submit --master yarn-cluster  --class org.apache.spark.examples.SparkPi../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
  2. 或者
  3. ./spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100

 

  • 两种模式的区别

理解YARN-Client和YARN-Cluster之间的区别需要先介绍一下ApplicationMaster。

在YARN中,每个Application实例都有一个ApplicationMaster进程,它是Application启动的第一个容器。负责和ResourceManager打交道并请求资源,获取资源之后告诉NodeManager为其启动Container。从深层次的含义讲YARN-Client和YARN-Cluster模式的区别其实就是ApplicationMaster进程的区别。

YARN-Client模式下

1.ApplicationMaster仅仅向YARN请求Executor,Client会和请求到的Container进行通信来完成作业的调度和执行,Client 是不能退出的。

2.Driver 运行在 Client 端(提交 Spark 作业的机器)。

3.日志可在控制台输出,便于测试。

YARN-Cluster模式下

1.Driver 运行在ApplicationMaster 中,它负责向YARN申请资源,并监督作业的运行状况。

2.Client 只要完成提交作业后就可以关掉,因为作业已经执行在 YARN 上运行了。

3.日志在终端看不到,只能通过 yarn logs-applicationId application_id命令或者在YARN的WebUI界面中查看日志。

建议使用YARN-Cluster模式,因为这种模式下,Driver会分布在计算集群内部,而不会对作业提交端造成过多的压力和资源消耗。

06

小结

本文主要针对数据分析工作中需要了解的Spark基础知识点进行了讲解,内容主要包括SPARK与MAPREDUCE的比较、SPARK生态系统、RDD&DataFrame&DataSet、共享变量和SPARK中YARN模式提交任务方式,希望阅读完本文的小伙伴有所收获。

 

-end-

参考文献

[1]《Spark之深入理解RDD结构》作者 TccccD

https://blog.csdn.net/u011094454/article/details/78992293

[2]《Spark RDD详解》作者 通凡

https://blog.csdn.net/wangxiaotongfan/article/details/51395769

[3]《Spark快速大数据分析》作者 [美] Holden Karau  [美] Andy Konwinski [美] Patrick Wendell  [加] Matei Zaharia

[4]《Spark on YARN 的两种模式》作者  Python之简

https://blog.csdn.net/qq_1290259791/article/details/79606566

[5] 《图解Spark核心技术与案例实战》作者 郭景瞻

[6] Spark官网

http://spark.apache.org/docs/latest/rdd-programming-guide.html#shared-variables

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/785594
推荐阅读
相关标签
  

闽ICP备14008679号