当前位置:   article > 正文

简述Spark基础及架构_spark架构

spark架构

一、spark简介

spark是基于内存的分布式计算框架,特点是快速、易用、通用及多种运行模式。

  • 快速:
    基于内存数据处理,比MR快100个数量级以上(逻辑回归算法测试)
    基于硬盘数据处理,比MR快10个数量级以上

  • 易用:
    支持Java、Scala、Python、R语言
    交互式shell方便开发测试

  • 通用性:
    一栈式解决方案:批处理、交互式查询、实时流处理、图计算及机器学习

  • 多种运行模式:
    YARN、Mesos、EC2、Kubernetes、Standalone、Local

二、spark技术栈

  • Spark Core: 核心组件,分布式计算引擎
  • Spark SQL:: 高性能的基于Hadoop的SQL解决方案
  • Spark Streaming: 可以实现高吞吐量、具备容错机制的准实时流处理系统
  • Spark GraphX: 分布式图处理框架
  • Spark MLlib: 构建在Spark上的分布式机器学习库

三、spark架构

spark架构主要由以下组件构成:

  • Application: 建立在 Spark上的用户程序,包括 Driver代码和运行在集群各节点 Executor中的代码
  • Driver program: 驱动程序, Application中的main函数并创建 SparkContext
  • Cluster Manager : 在集群(Standalone、Mesos、YARN)上获取资源的外部服务
  • Worker Node: 集群中任何可以运行 Application代码的节点
  • Executor : 某个 Application运行在 worker节点上的一个进程
  • Task: 被送到某个 Executor上的工作单元
  • Job : 包含多个 Task 组成的并行计算,往往由 Spark Action算子 触发生成,一个 Application中往往会产生多个 Job
  • Stage: 每个Job会被拆分成多组 Task,作为一个 TaskSet,其名称为 Stage

运行架构:
在这里插入图片描述

  • 在驱动程序中,通过SparkContext主导应用的执行
  • SparkContext可以连接不同类型的Cluster Manager(Standalone、YARN、Mesos),连接后,获得集群节点上的Executor
  • 一个Worker节点默认一个Executor,可通过SPARK_WORKER_INSTANCES调整
  • 每个应用获取自己的Executor
  • 每个Task处理一个RDD分区
spark运行流程:

在这里插入图片描述

spark 中 masterworkerdriverexecutor 之间的关系

在这里插入图片描述

四、saprk常用API

在开发过程中,常用API主要有: SparkContext、 SparkSession、 RDD、 DataSet及 DataFrame,本文主要介绍 SparkContext、 SparkSession。

4.1 SparkContext

  • 是Spark的主入口
  • 连接Driver与Spark Cluster(Workers)
  • 每个JVM仅能有一个活跃的SparkContext

在IDEA中创建 SparkContext ,代码如下:

//导包
import org.apache.spark.{SparkConf, SparkContext}
//创建一个SparkContext对象
val conf=new SparkConf().setMaster("local[2]").setAppName("HelloSpark")
val sc=SparkContext.getOrCreate(conf)
  • 1
  • 2
  • 3
  • 4
  • 5

4.2 SparkSession

  • SparkSession是 SparkSQL的入口 ,是在 2.0中引入的新的 API,旨在为 Spark编程提供统一的编程入口,SparkSession整合了 SparkConf、 SparkContext、SQLContext、 HiveContext以及 StreamingContext。
  • 当创建了SparkSession对象后,可以间接拿到 sparkContextsqlContext 对象,所以在 2.0版本后推荐使用 SparkSession作为编程入口。
  • 在2.0之前的 Spark版本中, spark shell会自动创建一个 SparkContext对象(sc)
  • 在2.0+版本中,Spark shell则会额外创建一个 SparkSession对象(spark),如下图:
    在这里插入图片描述

在IDEA中创建 SparkSession ,代码如下:

//导包
import org.apache.spark.sql.SparkSession
//创建一个SparkSession对象
val spark = SparkSession.builder
.master("local[2]")
.appName("appName")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

五、spark数据核心–RDD

5.1 RDD概念

RDD称为 弹性分布式数据集( Resilient Distributed Datasets),它是一种分布式的内存抽象,允许在大型集群上执行基于内存的计算( In Memory Computing),为用户屏蔽了底层复杂的计算和映射环境。

弹性: 指在任何时候都能进行重算,这样当集群中的一台机器挂掉而导致存储在其上的RDD丢失后,Spark还可以重新计算出这部分的分区的数据

分布式: 数据计算分布于多节点

数据集: RDD并不存储真正的数据,只是对数据和操作的描述。它是只读的、分区记录的集合,每个分区分布在集群的不同节点上

简单来说,RDD是将数据项拆分为多个分区的集合,存储在集群的工作节点上的内存和磁盘中,并执行正确的操作 。

更规范的解释是:

  • RDD是用于数据转换的接口 ,比如 map、 filter、 groupBy、 join等。
  • RDD指向了存储在 HDFS、 Cassandra、 HBase等、或缓存(内存、内存 +磁盘、仅磁盘等),或在故障或缓存收回时重新计算其他 RDD分区中的数据 。从
    这个意义上讲, RDD不包含任何待处理数据。

5.2 RDD的五大特性

5.2.1 分区(Partition)

  • RDD是由多个分区构成的( 使用RDD#partitions返回 RDD的所有分区信息),每个Partition都有一个唯一索引编号 (使用Partition#index访问)
  • RDD分区概念与MapReduce的输入切片概念是类似的,对每个分区的运算会被一个当作一个Task执行。举例:如果有100个分区,那么RDD上有 n 个操作将会产生有 n*100 个任务。

5.2.2 compute函数

每个分区上都有compute函数,计算该分区中的数据

5.2.3 RDD依赖(DAG)

RDD有依赖性,通常情况下一个 RDD是来源于另一个 RDD,这个叫做 lineage。RDD会记录下这些依赖,方便容错。也称 DAG

5.2.4 分区器(Partitioner)

只有 Key-Value 类型的 RDD才有分区器 ,可以传递一个自定义的 Partitioner 进行重新分区,非 Key-Value类型的 RDD(PairRDD)分区器的值是 None。
不同的 RDD的compute函数逻辑各不一样,比如:

  • MapPartitionsRDD的compute是将用户的转换逻辑作用到指定的Partition上。因为 RDD的map算子产生MapPartitionsRDD,而 map算子的参数(具体操作逻辑)是变化的。
  • HadoopRDD的compute是读取指定Partition数据。因为sc.hadoopFile(“path”)”读取 HDFS文件返回的RDD具体类型便是 HadoopRDD,所以只需要读取数据即可。
  • CheckpointRDD的compute是直接读取检查点的数据。一旦 RDD进行checkpoint,将变成CheckpointRDD

5.2.5 分区优先位置列表

该列表存储了存取每个分区的优先位置 。对于一个 HDFS文件来说,这个列表保存了每个分区所在的数据块的位置。按照 “移动数据不如移动计算的” 的理念, Spark在进行任务调度的时候,会尽可能的将计算任务移动到所要处理的数据块的存储位置

六、创建RDD

6.1 使用"集合"创建RDD

通过集合创建RDD有两种方法:parallelizemakeRDD
makeRDD多一个重载方法:重载分配一系列本地Scala集合形成一个RDD,可以为每个集合对象创建一个分区,并指定优先位置便于在运行中优化调度。
使用本地集合创建RDD的问题在于:由于这种方法需要用到一台机器中集合的全部数据,所以这种方式在测试和原型构造之外很少使用,一般在测试时使用

使用 parallelize 创建RDD:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SparkContextDemo extends App {
  //创建一个spark context对象
  val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkTest")  //Spark默认会根据集群的情况来设置分区的数量,也可以通过parallelize()第二参数来指定
  val sc: SparkContext = SparkContext.getOrCreate(conf)
  //创建rdd1,不指定分区
   val rdd1: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8))
  println(rdd1.partitions.size)  //控制台打印:2
  //创建rdd2,指定分区:5
  private val rdd2: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6),5)
  println(rdd2.partitions.size)   //控制台打印:5
  
  //关闭资源
  sc.stop()
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

6.2 通过"加载"创建RDD

import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession

object SparkSessionDemo extends App {
  //创建一个spark session对象
   val spark: SparkSession = SparkSession.builder()
    .master("local[2]")
    .appName("sparkSessionTest")
    .getOrCreate()
   val sc: SparkContext = spark.sparkContext
   
  //加载本地文件
 val distFile=sc.textFile("file:///val distFile=sc.textFile("file:///home/hadoop/data/hello.txt")")

//加载hdfs文件
val distHDFSFile=sc.textFile("hdfs://hadoop000:9000/hello.txt")

  //关闭资源
  sc.stop()
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

注:

  • 加载“file://……”时,以local运行仅需一份本地文件,以Spark集群方式运行,应保证每个节点均有该文件的本地副本
  • Spark默认访问HDFS,为HDFS文件的每一个数据块创建一个分区,也可以通过textFile()第二个参数指定,但只能比数据块数量多
  • 加载路径支持目录、压缩文件以及通配符:
sc.textFile("/my/directory")
sc.textFile("/my/directory/*.txt")
sc.textFile("/my/directory/*.gz")
  • 1
  • 2
  • 3

6.3 创建PairPDD

SparkContext.wholeTextFiles(): 可以针对一个目录中的大量小文件返回<filename,fileContent>作为PairRDD。Spark 为包含键值对类型的 RDD 提供了一些专有的操作,比如:reduceByKey()、groupByKey()……
也可以通过键值对集合创建PairRDD:sc.parallelize(List((1,2),(1,3)))

示例: IDEA src的data目录中有两个文件:hello.txt、test01.txt
内容如下:
在这里插入图片描述
在这里插入图片描述

创建一个pairPDD读取数据:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CreatRDDDemo extends App{
  //创建一个spark context对象
  val conf:SparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkTest")
  val sc:SparkContext = SparkContext.getOrCreate(conf)
  //加载文件
  val pairRDD:RDD[(String,String)] = sc.wholeTextFiles("file:///D:\\work\\date\\2020\\spark0904\\src\\data")
  pairRDD.foreach(println)
  sc.stop()
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

打印结果如下图:
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/772362
推荐阅读
相关标签
  

闽ICP备14008679号