赞
踩
勤奋踏实,诚实守信。我始终相信我读过的所有书都不会白读,它总会在未来日子的某一个场合帮助我表现得更出色,读书是可以给人以力量的,它更能给人快乐。
传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是采用非循环式的数据流模型,使得在迭代计算式要进行大量的磁盘IO操作。Spark中的RDD可以很好的解决这一缺点。
RDD是Spark提供的最重要的抽象概念,我们可以将RDD理解为一个分布式存储在集群中的大型数据集合,不同RDD之间可以通过转换操作形成依赖关系实现管道化,从而避免了中间结果的I/O操作,提高数据处理的速度和性能。接下来,本章将针对RDD进行详细讲解。
Spark为RDD提供了两个重要的机制,分别是持久化机制(即缓存机制)和容错机制。接下来,本节将针对持久化机制和容错机制进行详细介绍。
存储级别 | 相关说明 |
---|---|
MEMORY_ONLY | 默认存储级别。将RDD作为反序列化的Java对象,缓存到JVM中,若内存放不下(内存已满情况),则某些分区将不会被缓存,并且每次需要时都会重新计算 |
MEMORY_AND_DISK | 将RDD作为反序列化的Java对象,缓存到JVM中,若内存放不下(内存已满情况),则将剩余分区存储到磁盘上,并在需要时从磁盘读取 |
MEMORY_ONLY_SER | 将RDD作为序列化的Java对象(每个分区序列化为一个字节数组),比反序列化的Java对象节省空间,但读取时,更占CPU |
MEMORY_AND_DISK_SER | 与MEMORY_ONLY_SER类似,但是将当内存放不下则溢出到磁盘,而不是每次需要时重新计算它们 |
DISK_ONLY | 仅将RDD分区全部存储到磁盘上 |
MEMORY_ONLY_2MEMORY_AND_DISK_2 | 与上面的级别相同。若加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上 |
OFF_HEAP(实验性) | 与MEMORY_ONLY_SER类似,但将数据存储在堆外内存中(这需要启用堆外内存) |
为了大家更好地理解,接下来,通过代码演示如何使用persist()方法和cache()方法对RDD进行持久化。
scala> import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel scala> val list=List("hadoop","spark","hive") list: List[String] = List(hadoop, spark, hive) scala> val listRDD=sc.parallelize(list) listRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:29 scala> listRDD.persist(StorageLevel.DISK_ONLY) res0: listRDD.type = ParallelCollectionRDD[0] at parallelize at <console>:29 scala> println(listRDD.count()) 3 scala> println(listRDD.collect().mkString(",")) hadoop,spark,hive
listRDD.persist(StorageLevel.DISK_ONLY)设置了持久化级别,但是这行代码并不会马上执行持久化操作,只有当第一次调用行动算子后(println(listRDD.count())),才会去挂靠持久化,而第2次调用算子时,使用已经产生的持久化RDD,速度会比之前快。
scala> val list=List("hadoop","spark","hive")
list: List[String] = List(hadoop, spark, hive)
scala> val listRDD=sc.parallelize(list)
listRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:29
scala> listRDD.cache()
res3: listRDD.type = ParallelCollectionRDD[1] at parallelize at <console>:29
scala> println(listRDD.count())
3
scala> println(listRDD.collect().mkString(","))
hadoop,spark,hive
当Spark集群中的某一个节点由于宕机导致数据丢失,则可以通过Spark中的RDD进行容错恢复已经丢失的数据。RDD提供了两种故障恢复的方式,分别是血统(Lineage)方式和设置检查点(checkpoint)方式。
血统(Lineage)方式,主要是根据RDD之间的依赖关系对丢失数据的RDD进行数据恢复。如果丢失数据的子RDD在进行窄依赖运算r则只需要把丢失数据的父RDD的对应分区进行重新计算即可,不需要依赖其他的节点,并且在计算过程中不会存在冗余计算;若丢失数据的RDD进行宽依赖运算,则需要父RDD的所有分区都要进行从头到尾的计算,在计算过程中会存在冗余计算。为了解决宽依赖运算中出现的计算冗余问题,Spark又提供了另一种方式进行数据容错,即设置检查点(checkpoint)方式。
设置检查点(checkPoint)方式,本质上是将RDD写入磁盘进行存储。当RDD在进行宽依赖运算时,只需要在中间阶段设置一个检查点进行容错,即通过Spark中的sparkContext对象调用setCheckpoint()方法,设置一个容错文件系统目录(如HDFS)作为检查点checkpoint,将checkpoint的数据写入之前设置的容错文件系统中进行高可用的持久化存储,若是后面有节点出现宕机导致分区数据丢失,则可以从做检查点的RDD开始重新计算即可,不需要进行从头到尾的计算,这样就会减少开销。
RDD(Resilient Distributed Dataset,弹性分布式数据集)是Spark中的核心概念之一。RDD具有分布式和不可变的特性,能够在集群中进行并行计算。RDD机制通过持久化机制和容错机制来保证数据的可靠性和高效性。
持久化机制是指RDD可以将计算结果缓存在内存中,以便在后续的计算中复用,减少了数据的重复计算和IO开销。RDD提供了多种持久化级别,包括内存和磁盘,用户可以根据实际需求选择不同的级别。
容错机制是指RDD具有自动容错和恢复能力。RDD通过将数据划分成一系列的分区(partitions)来实现容错。每个分区都会在集群中的不同节点上进行备份,一旦某个节点发生故障,Spark可以自动将该分区计算任务转移到其他可用节点上进行处理,从而确保整个计算过程不会中断。
总之,RDD机制通过持久化机制和容错机制来提高了计算效率和可靠性,并且可以在分布式环境中灵活应用。这让Spark成为了一个高性能和可靠的大数据处理框架。
转载自:https://blog.csdn.net/u014727709/article/details/132509716
欢迎
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。