当前位置:   article > 正文

编程实现将rdd转换为dataframe:源文件内容如下(_Spark Core 解析:RDD

编程实现将rdd转换为dataframe 源文件内容如下(包含id,name,age): 请将数据复制保

05e35d65eb05b11a6015a44e20f4c709.png

引言

Spark Core是Spark的核心部分,是Spark SQL,Spark Streaming,Spark MLlib等等其他模块的基础, Spark Core提供了开发分布式应用的脚手架,使得其他模块或应用的开发者不必关心复杂的分布式计算如何实现,只需使用Spark Core提供的分布式数据结构RDD及丰富的算子API,以类似开发单机应用的方式来进行开发。

e512a3c7c10082e888164a9e173fc397.png

图中最下面那个就是Spark Core啦,日常使用的RDD相关的API就属于Spark Core,而Dataset、DataFrame则属于Spark SQL。

RDD 概览

RDD是Spark Core的用户级API,了解RDD是了解Spark Core的第一步,本文基于Spark 2.x,主要对RDD的特点和组成进行分析。

定义

RDD (Resilient Distributed Dataset,弹性分布式数据集):

  • Resilient:不可变的、容错的
  • Distributed:数据分散在不同节点(机器,进程)
  • Dataset:一个由多个分区组成的数据集

特征

In-Memory:RDD会优先使用内存Immutable(Read-Only):一旦创建不可修改Lazy evaluated:惰性执行Cacheable:可缓存,可复用Parallel:可并行处理Typed:强类型,单一类型数据Partitioned:分区的Location-Stickiness:可指定分区优先使用的节点

是Spark中最核心的数据抽象,数据处理和计算基本都是基于RDD。

组成

一个RDD通常由5个要素组成:

  • 一组分区(partition)
  • 一个计算函数
  • 一组依赖(直接依赖的父RDD)
  • 一个分区器 (可选)
  • 一组优先计算位置(e.g. 将Task分配至靠近HDFS块的节点进行计算) (可选)

与传统数据结构对比,只关心访问,不关心存储。通过迭代器访问数据,只要数据能被不重复地访问即可。后面会详细分析各要素。

算子

算子,即对RDD进行变换的操作,按照是否触发Job提交可以分为两大类:

  • transformation:不会立即执行的一类变换,不会触发Job执行,会生成并返回新的RDD,同时记录下依赖关系。如:map,filter,union,join,reduceByKey。
  • action: 会立即提交Job的一类变换,不会返回新的RDD,而是直接返回计算结果。如:count,reduce,foreach。

a9f963ffecc83595f53dcbea8cbecb9f.png

下面对RDD的组成要素进行分析

Partition & Partitioner

为什么要把数据分区?把数据分成若干partition是为了将数据分散到不同节点不同线程,从而能进行分布式的多线程的并行计算。

按什么规则分区?RDD从数据源生成的时候,数据通常是随机分配到不同的partition或者保持数据源的分区,如sc.parallelize(…),sc.textFile(…)。

这对于某些RDD操作来说是没有问题的,比如filter(),map(),flatMap(),rdd.union(otherRDD),rdd.intersection(otherRDD),rdd.subtract(otherRDD)。

但是对于reduceByKey(),foldByKey(),combineByKey(),groupByKey(),sortByKey(),cogroup(), join() ,leftOuterJoin(), rightOuterJoin()这些操作,随机分配分区就非常不友好,会带来很多额外的网络传输。影响一个分布式计算系统性能的最大敌人就是网络传输,所以必须尽量最小化网络传输。

为了减少网络传输,怎么分区才合理?对于reduceByKey操作应该把相同key的数据放到同一分区;对于sortByKey操作应该把同一范围的数据放到同一分区。

可见不同的操作适合不同的数据分区规则,Spark将划分规则抽象为Partitioner(分区器) ,分区器的核心作用是决定数据应归属的分区,本质就是计算数据对应的分区ID。

在Spark Core中内置了2个Partitioner来支持常用的分区规则(Spark MLlib,Spark SQL中有其他的)。

  • HashPartitioner 哈希分区器
  • RangePartitioner 范围分区器

HashPartitioner

哈希分区器是默认的分区器,也是使用最广泛的一个,作用是将数据按照key的hash值进行分区。

分区ID计算公式非常简单:key的hash值 % 分区个数 , 如果key为null,则返回0.

也就是将key的hash值(Java中每个对象都有hash code,对象相等则hash code相同),除以分区个数,取余数为分区ID,这样能够保证相同Key的数据被分到同一个分区,但是每个分区的数据量可能会相差很大,出现数据倾斜。

RangePartitioner

RangePartitioner的作用是根据key,将数据按范围大致平均的分到各个分区,只支持能排序的key。

要知道一个key属于哪个分区,需要知道每个分区的边界值。确定边界值需要对数据进行排序,因为数据量通常较大,通过样本替代总体来估计每个分区的边界值。

采样流程:

  • 1. 使用水塘抽样对总体进行采样;
  • 2. 针对数据量远超平均值的分区,进行传统抽样(伯努利抽样)。

使用场景:sortByKey

如何使用

对于一个没有明确指定Partitioner的情况下,reduceByKey(),foldByKey(),combineByKey(),groupByKey()等操作会默认使用HashPartitioner。sortByKey操作会采用RangePartitioner。

reduceByKey也有一个可以自定义分区器的版本:reduceByKey(partitioner: Partitioner, func: (V, V) => V)

Function

传入给transformation的函数

transformation会生成新的RDD,传给RDD transformation的函数最终会以成员变量的形式存储在新生成的RDD中。

以map函数为例。

val r11 = r00.map(n => (n, n))

map函数接受的参数类型为f: T => U,因为Scala支持函数式编程,函数可以像值一样存储在变量中,也可以作为参数传递。f参数的类型T => U代表一种函数类型,这个函数的输入参数的类型必须为T,输出类型为U,这里T和U都是泛型,T代表RDD中数据的类型,对于RDD[String]来说,T就是String。

最终f参数,会转换成有关迭代器的一个函数,存储到RDD的f成员变量中。

最终存储的类型为:f: (TaskContext, Int, Iterator[T]) => Iterator[U]对于map来说是这样一个函数(context, pid, iter) => iter.map(f)也就是说我们传入到RDD.map的f函数,最终传给了Iterator.map函数。

传入给action的函数

action不会生成新的RDD,而是将函数传递给Job。

Dependency

当RDD1经过transformation生成了RDD2,就称作RDD2依赖RDD1,RDD1是RDD2的父RDD,他们是父子关系。

先看一个例子

  1. val r00 = sc.parallelize(0 to 9)
  2. val r01 = sc.parallelize(0 to 90 by 10)
  3. val r10 = r00 cartesian r01
  4. val r11 = r00.map(n => (n, n))
  5. val r12 = r00 zip r01
  6. val r13 = r01.keyBy(_ / 20)
  7. val r20 = Seq(r11, r12, r13).foldLeft(r10)(_ union _)

我们看下RDD之间的依赖关系图

64fc430dbc27b5b96e32c0a537f96538.png

RDD的依赖关系网又叫RDD的血统(lineage),可以看做是RDD的逻辑执行计划。

Dependency存储

父RDD与子RDD之间的依赖关系记录在子RDD的属性中(deps: Seq[Dependency[_]]),数据类型为Dependency(可以有多个),Dependency中保存了父RDD的引用,这样通过Dependency就能找到父RDD。

Dependency分类

Dependency不仅描述了RDD之间的依赖关系,还进一步描述了不同RDD的partition之间的依赖关系。

依据partition之间依赖关系的不同Dependency分为两大类:

  • NarrowDependency 窄依赖,1个父分区只对应1个子分区,这时父RDD不需要改变分区方式。如:map、filter、union,co-paritioned join
  • ShuffleDependency Shuffle依赖(宽依赖),1个父分区对应多个子分区,这种情况父RDD必须重新分区,才能符合子RDD的需求。如:groupByKey、reduceByKey、sortByKey,(not co-paritioned)join

NarrowDependency

NarrowDependency是一个抽象类,一共有3中实现类,也就是说有3种NarrowDependency。

  • OneToOneDependency:一对一依赖,比如map,
  • RangeDependency:范围依赖,如 union
  • PruneDependency:裁剪依赖,过滤掉部分分区,如PartitionPruningRDD

5e50016144d65283800ab7a4417aa538.png

c53f7114b31f4f9161a93b53f90c782a.png

ShuffleDependency

出现shuffle依赖表示父RDD与子RDD的分区方式发生了变化。

fe36bd60e93c1285513ce1c672e3b75b.png

RDD分类

RDD的具体实现类有几十种(大概60+),介绍下最常见的几种。

  1. scala> r20.toDebugString
  2. res34: String =
  3. (28) UnionRDD[38] at union at <pastie>:31 []
  4. | UnionRDD[37] at union at <pastie>:31 []
  5. | UnionRDD[36] at union at <pastie>:31 []
  6. | CartesianRDD[32] at cartesian at <pastie>:27 []
  7. | ParallelCollectionRDD[30] at parallelize at <pastie>:25 []
  8. | ParallelCollectionRDD[31] at parallelize at <pastie>:26 []
  9. | MapPartitionsRDD[33] at map at <pastie>:28 []
  10. | ParallelCollectionRDD[30] at parallelize at <pastie>:25 []
  11. | ZippedPartitionsRDD2[34] at zip at <pastie>:29 []
  12. | ParallelCollectionRDD[30] at parallelize at <pastie>:25 []
  13. | ParallelCollectionRDD[31] at parallelize at <pastie>:26 []
  14. | MapPartitionsRDD[35] at keyBy at <pastie>:30 []
  15. | ParallelCollectionRDD[31] at parallelize at <pastie>:26 []

不同的RDD代表着不同的‘计算模式’:MapPartitionsRDD,对Iterator的每个值应用相同的函数;

ShuffledRDD,对Iterator执行combineByKey的模式,可以指定 createCombiner: V => C,mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, compute函数返回ShuffleReader生成的迭代器。

MapPartitionsRDD

MapPartitionsRDD对于父RDD的依赖类型只能是OneToOneDependency,代表将函数应用到每一个分区的计算。

相关transformation:map, flatMap, filter, mapPartitions等等

  1. scala> sc.parallelize(0 to 10000).map(x=>(x%9,1)).dependencies
  2. res35: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@7c6843f)

ShuffledRDD

对于父RDD的依赖类型只能是ShuffleDependency,代表需要改变分区方式进行shuffle的计算。

会创建ShuffledRDD的transformation:RDD:coalescePairRDDFunctions: reduceByKey, combineByKeyWithClassTag , partitionBy (分区方式不同时) 等OrderedRDDFunctions: sortByKey, repartitionAndSortWithinPartitions

RDD Checkpoint

Checkpoint检查点,是一种截断RDD依赖链,并把RDD数据持久化到存储系统(通常是HDFS或本地)的过程。主要作用是截断RDD依赖关系,防止stack overflow(与DAG递归调用有关)。存储的数据包括RDD计算后的数据和partitioner。

Checkpoint分为两种:

  • reliable :调用函数为RDD.checkpoint(),数据保存到可靠存储HDFS,RDD的parent替换为ReliableCheckpointRDD;
  • local:调用函数为RDD.localCheckpoint(),数据保存到spark cache中(不是本地),RDD的parent替换为LocalCheckpointRDD。当executor挂掉,数据会丢失。

注意:与streaming中的checkpointing不同,streaming中的checkpointing会同时保存元数据和RDD数据,可以用于Application容错。

如何使用

  1. scala> :paste
  2. // Entering paste mode (ctrl-D to finish)
  3. val a=sc.parallelize(0 to 9)
  4. val b=a.map(_*10)
  5. val c=b.filter(_>10)
  6. // Exiting paste mode, now interpreting.
  7. a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
  8. b: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25
  9. c: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at filter at <console>:26
  10. scala> c.toDebugString
  11. res0: String =
  12. (4) MapPartitionsRDD[2] at filter at <console>:26 []
  13. | MapPartitionsRDD[1] at map at <console>:25 []
  14. | ParallelCollectionRDD[0] at parallelize at <console>:24 []
  15. scala> sc.setCheckpointDir("/tmp/spark-checkpoint")
  16. scala> b.checkpoint
  17. scala> b.count
  18. res4: Long = 10
  19. scala> c.toDebugString
  20. res5: String =
  21. (4) MapPartitionsRDD[2] at filter at <console>:26 []
  22. | MapPartitionsRDD[1] at map at <console>:25 []
  23. | ReliableCheckpointRDD[3] at count at <console>:26 []
  24. scala> b.toDebugString
  25. res6: String =
  26. (4) MapPartitionsRDD[1] at map at <console>:25 []
  27. | ReliableCheckpointRDD[3] at count at <console>:26 []
  28. //local
  29. scala> c.localCheckpoint
  30. scala> c.count
  31. res9: Long = 8
  32. scala> c.toDebugString
  33. res10: String =
  34. (4) MapPartitionsRDD[2] at filter at <console>:26 [Disk Memory Deserialized 1x Replicated]
  35. | CachedPartitions: 4; MemorySize: 104.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
  36. | LocalCheckpointRDD[4] at count at <console>:26 [Disk Memory Deserialized 1x Replicated]

查看HDFS上存储的checkpoint文件

  1. hdfs dfs -ls /tmp/spark-checkpoint/74acd422-2693-4f47-b786-69b4f8dc33ad/rdd-1
  2. Found 4 items
  3. -rw-r--r-- 2 ld-liuyuan_su hdfs 91 2019-10-09 08:40 /tmp/spark-checkpoint/74acd422-2693-4f47-b786-69b4f8dc33ad/rdd-1/part-00000
  4. -rw-r--r-- 2 ld-liuyuan_su hdfs 101 2019-10-09 08:40 /tmp/spark-checkpoint/74acd422-2693-4f47-b786-69b4f8dc33ad/rdd-1/part-00001
  5. -rw-r--r-- 2 ld-liuyuan_su hdfs 91 2019-10-09 08:40 /tmp/spark-checkpoint/74acd422-2693-4f47-b786-69b4f8dc33ad/rdd-1/part-00002
  6. -rw-r--r-- 2 ld-liuyuan_su hdfs 101 2019-10-09 08:40 /tmp/spark-checkpoint/74acd422-2693-4f47-b786-69b4f8dc33ad/rdd-1/part-00003

RDD Cache

Cache机制是Spark提供的一种将数据缓存到内存(或磁盘)的机制,主要用途是使得中间计算结果可以被重用。

常见的使用场景有如下几种,底层都是调用RDD的cache,这里只讲RDD的cache。

  1. rdd.cache()
  2. dataset.cache()
  3. spark.sql("cache table test.test")
  4. ...

Spark的Cache不仅能将数据缓存到内存,也能使用磁盘,甚至同时使用内存和磁盘,这种缓存的不同存储方式,称作‘StorageLevel(存储级别)’。

可以这样使用:rdd.persist(StorageLevel.MEMORY_ONLY)

Spark目前支持的存储级别如下:

  1. NONE (default)
  2. DISK_ONL
  3. DISK_ONLY_2
  4. MEMORY_ONLY (cache操作使用的级别)
  5. MEMORY_ONLY_2
  6. MEMORY_ONLY_SER
  7. MEMORY_ONLY_SER_2
  8. MEMORY_AND_DISK
  9. MEMORY_AND_DISK_2
  10. MEMORY_AND_DISK_SER
  11. MEMORY_AND_DISK_SER_2
  12. OFF_HEAP

2代表存储份数为2,也就是有个备份存储。SER代表存储序列化后的数据。

DISK_ONLY后面没跟SER,但其实只能是存储序列化后的数据。

要cache RDD,常用到两个函数, cache()persist(),cache方法本质上是persist(StorageLevel.MEMORY_ONLY),也就是说persist可以指定StorageLevel,而cache不行。

Checkpoint vs Cache

  • Cache用于缓存,采用临时保存,Executor挂掉会导致数据丢失,但是数据可以重新计算。
  • Checkpoint用于截断依赖链,reliable方式下Executor挂掉不会丢失数据,数据一旦丢失不可恢复。

使用的时候要想清楚目的,就不会用错啦。

RDD Broadcast

一种将数据在不同节点间共享的机制,可以将指定的只读数据广播分发到每个Executor,每个Executor有一份完整的备份。

是一种高效的数据共享机制,被广播的数据可以被不同的stage和task共享,而不需要给每个task拷贝一份。

Broadcast机制有个非常重要的作用,Spark就是通过它将task分发给各个Executor。

下面举个使用的例子

rddA

rddB

rddAB

k

low

up

1

a

A

2

b

B

3

c

C

  1. scala> val rddA=sc.parallelize(List((1,"a"),(2,"b"),(3,"c")))
  2. rddA: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[5] at parallelize at <console>:24
  3. scala> val rddB=sc.parallelize(List((1,"A"),(2,"B"),(3,"C")))
  4. rddB: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[6] at parallelize at <console>:24
  5. scala> val rddAB=rddA.join(rddB)
  6. rddAB: org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[9] at join at <console>:27
  7. scala> rddAB.collect
  8. res11: Array[(Int, (String, String))] = Array((1,(a,A)), (2,(b,B)), (3,(c,C)))
  9. scala> rddAB.toDebugString
  10. res12: String =
  11. (4) MapPartitionsRDD[9] at join at <console>:27 []
  12. | MapPartitionsRDD[8] at join at <console>:27 []
  13. | CoGroupedRDD[7] at join at <console>:27 []
  14. +-(4) ParallelCollectionRDD[5] at parallelize at <console>:24 []
  15. +-(4) ParallelCollectionRDD[6] at parallelize at <console>:24 []
  16. scala> val rddBMap=sc.broadcast(rddB.collectAsMap)
  17. rddBMap: org.apache.spark.broadcast.Broadcast[scala.collection.Map[Int,String]] = Broadcast(9)
  18. scala> val rddABMapJoin= rddA.map{case(k,v) => (k,(v,rddBMap.value.get(k).get))}
  19. rddABMapJoin: org.apache.spark.rdd.RDD[(Int, (String, String))] = MapPartitionsRDD[10] at map at <console>:27
  20. scala> rddABMapJoin.collect
  21. res13: Array[(Int, (String, String))] = Array((1,(a,A)), (2,(b,B)), (3,(c,C)))
  22. scala> rddABMapJoin.toDebugString
  23. res14: String =
  24. (4) MapPartitionsRDD[10] at map at <console>:27 []
  25. | ParallelCollectionRDD[5] at parallelize at <console>:24 []

通过broadcast机制,将原本的两个stage计算减少为1个stage。这里模拟实现了map-side join。

Broadcast VS Cache

Cache也会把数据分发到各个节点,但是一个节点上通常只有部分分区的数据,而Broadcast会保证每个节点都有完整的数据。Broadcast会消耗更多的内存,但是带来了更好的性能。

RDD Accumulators

Broadcast机制有个短板,它的变量是只读的,于是Spark提供了Accumulators(累加器)来弥补。

Accumulator的值可以增减,但是不能直接修改为指定值。

  1. scala> val acc=sc.longAccumulator
  2. acc: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 200, name: None, value: 0)
  3. scala> rddA.map(_=>acc.add(-1)).count
  4. res15: Long = 3
  5. scala> acc.value
  6. res17: Long = -3

参考

Spark RDDs Simplified

Understanding Spark Partitioning

Checkpointing

Spark内核设计的艺术

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/389332
推荐阅读
相关标签
  

闽ICP备14008679号