赞
踩
Spark是基于内存计算的大数据并行计算框架。spark基于内存计算,提高了在大数据环境下数据处理的的实时性,同时保证了高容错性和高可伸缩性。
---<<Spark大数据处理技术,应用与性能优化>>
Spark中有很多内容,本篇文章只讲其中的Spark core ,Spark sql ,Spark stream。
RDD 是Resilient Distributed Dataset的简称。它是Apache Spark的基本数据结构。它是一个不可变的对象集合,在集群的不同节点上进行计算,可以理解为是一个数据分布在各个不同节点上的数据集合,当对它进行操作计算时,实际上是对各个节点上的数据进行计算。
Resilient:即在RDD lineage(DAG)的帮助下具有容错能力,能够重新计算由于节点故障而丢失或损坏的数据分区。
Distributed:数据分布在多个节点上。
Dataset:表示所操作的数据集。用户可以通过JDBC从外部加载数据集,数据集可以是JSON文件、CSV文件、文本文件或数据库
RDD的特点:
1、内存计算:它将中间计算结果存储在分布式内存(RAM)中,而不是磁盘中。
2、延迟计算:Apache Spark中的所有transformation都是惰性的,因为它们不会立即计算结果,它们会记住应用于数据集的那些transformation。直到action出现时,才会真正开始计算。
3、容错性:Spark RDDs能够容错,因为它们跟踪数据沿袭信息,以便在故障时自动重建丢失的数据。
4、不可变性:你可以通过对RDD计算得到新的RDD,但是无法改变现有RDD内的数据。跨进程共享数据是安全的。它也可以在任何时候创建或检索,这使得缓存、共享和复制变得容易。因此,它是一种在计算中达到一致性的方法。
5、分区性:partition是Spark RDD中并行性的基本单元,每个分区都是数据的逻辑分区。Partition—task一 一对应
6、持久化:用户可以声明他们将重用哪些RDDs,并为它们选择存储策略。
7、数据本地性:RDDs能够定义计算分区的位置首选项。位置首选项是关于RDD位置的信息。
RDD的操作:
RDD的操作分为两种,一种是transformation操作,一种是action操作。
transformation:
得到的结果还是一个RDD,都是延迟操作的函数,如:map(), filter(), reduceByKey()。
transformation有两种类型:窄变换、宽变换(窄依赖、宽依赖)。
窄变换:它是map、filter这样数据来自一个单独的分区的操作。即输出RDD分区中的数据,来自父RDD中的单个分区。
宽变换:在子RDD单个分区中计算结果所需的数据可能存在于父RDD的多个分区中。类似groupByKey()和reduceByKey()这样的transformation。宽依赖也称为shuffle transformation。
action:
得到的结果是计算的最终结果,其结果是一个值,而不是一个RDD。
action触发血缘关系中RDD上的transformation操作的真正计算,计算结果返回Driver端或者写入数据库。
常见的Action:first(), take(), reduce(), collect(), the count() 。
RDD创建方法:
RDD的分区概念:
park自动对RDDs中的大量数据元素进行分区,并在worker节点之间分配分区,计算。分区是逻辑上的。
属性 | 描述 |
partitions | 返回包含RDD所有分区引用的一个数组 |
partitions.size | 返回RDD的分区数量 |
partitioner | 返回下列分区器之一: NONE HashPartitioner, RangePartitioner, CustomPartitioner. |
通过partitionBy()方法可以设置分区器
NONE:分区不是基于数据的特性,但是分布是随机的,并且保证在节点之间是均匀的。
HashPartitioner:HashPartitioner基于Java的Object.hashcode()方法进行分区
RangePartitioner:如果有可排序的记录,那么范围分区将几乎在相同的范围内划分记录。范围Range是通过采样传入RDD的数据内容来确定的。首先,RangePartitioner将根据key对记录进行排序,然后根据给定的值将记录划分为若干个分区。
CustomPartitioner:还可以通过扩展Spark中的默认分区器类来定制需要的分区数量和应该存储在这些分区中的内容。
有多少分区是合适的:
分区数量太少、数量太多都有一定的优点和缺点。因此,建议根据集群配置和需求进行明智的分区。
Core—partition--task
分区太少的缺点:
减少并发性——您没有使用并行性的优点。可能存在空闲的worker节点。
数据倾斜和不恰当的资源利用——数据可能在一个分区上倾斜,因此一个worker可能比其他worker做得更多,因此可能会出现资源问题。
分区太多的缺点:
任务调度可能比实际执行时间花费更多的时间。
因此,在分区的数量之间存在权衡。推荐如下:
广播变量Broadcast Variables:
广播变量允许Spark程序员将只读变量缓存在每台机器上,而不是将它的副本与task一起发送出去。例如,可以使用广播变量以高效的方式为每个worker节点提供一个大型输入数据集的副本。广播变量是只读的,对每个worker节点只需要传输一次。这样,就从每个task一份变量副本,变成了一个executor一个变量副本,executor中执行的task共用这个副本。如果有多个worker节点,各个worker上的executor中的变量副本并不都是来自driver,因为Spark采用了高效的广播算法(TorrentBroadcast)来分配广播变量,以降低通信成本。
累加器Accumulator:
顾名思义,累加器是只能累加的变量。在task中只能对累加器进行添加数值,而不能获取累加器的值。累加器的值只能在driver端获取。
注意使用累加器的陷阱:
累加器和广播变量的不同:
广播变量一般用于Spark程序调优,如果不使用广播变量,程序计算结果不会错误,只是性能可能低下。
累加器不同,如果应该使用累加器的场景,你不使用,此时,程序计算结果就是错误的。
Spark应用程序调度过程:
Driver会向master申请资源,master收到请求之后,会向worker进行资源调度,启动Executor,然后,Executor向Driver进行注册。
此时Spark 应用程序就会知道哪些worker上面的executor已经就绪。
接着开始执行spark任务:
遇到action操作à创建一个job—提交给DAGScheduler, DAGScheduler会把job分为多个stages(shuffle:最后一个stage里面的task叫ResultTask,前面stage里面的task叫shuffleMapTask。),为每个stage创建一个taskset集合,集合中的task计算逻辑完全相同,只是处理的数据不同。Task的数量等于partition的数量,但是同时执行的task的数量等于集群core的数量。整个Application运行完成时间,等于最后一个执行完成的task的时间。---数据倾斜
然后DAGScheduler会把taskset交给TaskScheduler,TaskScheduler会把taskset里面的task发送给Executor。Executor接收到task,会启动一个线程池TaskRunner,在里面运行task。
Spark调优:
使用mapPartitions替代普通map
使用foreachPartitions替代foreach
filter算子之后使用coalesce算子
使用repartitionAndSortWithinPartitions替代repartition与sort类操作
Java中,有三种类型比较耗费内存:
• 自定义对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。
• 字符串,每个字符串内部都有一个字符数组以及长度等额外信息。
• 集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry。
Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低GC频率,提升性能。注意:前提是保证代码可运行,易维护。
方法一、过滤引起数据倾斜的key
方法二、提高shuffle操作的并行度:
执行RDD shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(100),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于Spark SQL中的shuffle类语句,比如group by、join等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小。
此方法虽然实现简单,但是治标不治本。比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理,因此还是会发生数据倾斜的。
方法三、对数据倾斜key使用随机数,实现两阶段聚合(即利用在key前缀或后缀添加随机数,将原本相同key的数据打散)
方法四、将hash shuffle join转换为map join
方法五、使用partitioner优化hash shuffle join:
为了对两个RDD中的数据进行join,Spark需要将两个RDD上的数据拉取同一个分区。Spark中join的默认实现是shuffled hash join:通过使用与第一个数据集相同的默认分区器对第二个数据集进行分区,从而确保每个分区上的数据将包含相同的key,从而使两个数据集具有相同哈希值的键位于同一个分区中。虽然这种方法总是可以运行的,但是此种操作比较耗费资源,因为它需要一次shuffle。
如果两个RDDs都有一个已知的分区器,则可以避免shuffle,如果它们有相同的分区器,则数据可能被本地合并;避免网络传输,因此,建议在join两个RDD之前,调用partitionby方法,并且使用相同的分区器。样例代码如下:
- val partitioner=new HashPartitioner(10)
- agesRDD.partitionBy(partitioner)
- addressRDD.partitionBy(partitioner)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。