当前位置:   article > 正文

SparkCore系列-2、RDD 创建_spark如何创建一个sc对象

spark如何创建一个sc对象

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

大数据系列文章目录

官方网址http://spark.apache.org/https://databricks.com/spark/about
在这里插入图片描述

如何将数据封装到RDD集合中?

如何将数据封装到RDD集合中,主要有两种方式:并行化本地集合(Driver Program中)和引用加载外部存储系统(如HDFS、Hive、HBase、Kafka、Elasticsearch等)数据集。

在这里插入图片描述
官方文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds

并行化集合

由一个已经存在的 Scala 集合创建,集合并行化,集合必须时Seq本身或者子类对象。
在这里插入图片描述
演示范例代码:从List列表构建RDD集合。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


/**
 *	Spark 采用并行化的方式构建Scala集合Seq中的数据为RDD
 *	- 将Scala集合转换为RDD
 *	sc.parallelize(seq)
 *	- 将RDD转换为Scala中集合
 *	rdd.collect()
 *	rdd.collectAsMap()
 */
object SparkParallelizeTest {
  def main(args: Array[String]): Unit = {
    // 创建应用程序入口SparkContext实例对象
    val sc: SparkContext = {
      // 1.a 创建SparkConf对象,设置应用的配置信息
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // 1.b 传递SparkConf对象,构建Context实例
      new SparkContext(sparkConf)
    }

    // TODO: 1、Scala中集合Seq序列存储数据val linesSeq: Seq[String] = Seq(
    val linesSeq: Seq[String] = Seq(
      "hadoop scala		hive spark scala sql sql",
       "hadoop scala spark hdfs hive	spark",
      "spark hdfs	spark hdfs scala hive spark"
    )

    // TODO: 2、并行化集合创建RDD数据集
    /*
    def parallelize[T: ClassTag]( seq: Seq[T],
    numSlices: Int = defaultParallelism
    ): RDD[T]
    */

    val inputRDD: RDD[String] = sc.makeRDD(linesSeq, numSlices = 2)

    // TODO: 3、调用集合RDD中函数处理分析数据
    val resultRDD: RDD[(String, Int)] = inputRDD
      .flatMap(_.split("\\s+"))
      .map((_, 1))
      .reduceByKey(_ + _)

    // TODO: 4、保存结果RDD到外部存储系统(HDFS、MySQL、HBase。。。。) resultRDD.foreach(println)
    resultRDD.foreach(println)
    // 应用程序运行结束,关闭资源
    sc.stop()
  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

其中文件路径:最好是全路径,可以指定文件名称,可以指定文件目录,可以使用通配符指定。实际项目中如果从HDFS读取海量数据,应用运行在YARN上,默认情况下,RDD分区数目等于HDFS 上Block块数目。

小文件读取

在实际项目中,有时往往处理的数据文件属于小文件(每个文件数据数据量很小,比如KB, 几十MB等),文件数量又很大,如果一个个文件读取为RDD的一个个分区,计算数据时很耗时性能低下,使用SparkContext中提供:wholeTextFiles类,专门读取小文件数据。
在这里插入图片描述
范例演示:读取100个小文件数据,每个文件大小小于1MB,设置RDD分区数目为2。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


/**
 * 采用SparkContext#wholeTextFiles()方法读取小文件
 */
object SparkWholeTextFileTest {
  def main(args: Array[String]): Unit = {
    // 创建应用程序入口SparkContext实例对象
    val sc: SparkContext = {
      // 1.a 创建SparkConf对象,设置应用的配置信息
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // 1.b 传递SparkConf对象,构建Context实例
      new SparkContext(sparkConf)
    }

    // TODO: wholeTextFiles()
    val inputRDD: RDD[String] = sc
      .wholeTextFiles("datas/ratings100/", minPartitions = 2)
      .flatMap(tuple => tuple._2.split("\\n"))

    println(s"Partitions Number = ${inputRDD.getNumPartitions}")
    println(s"Count = ${inputRDD.count()}")

    // 应用程序运行结束,关闭资源
    sc.stop()
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

实际项目中,可以先使用wholeTextFiles方法读取数据,设置适当RDD分区,再将数据保存到文件系统,以便后续应用读取处理,大大提升性能。

RDD 分区数目

在讲解 RDD 属性时,多次提到了分区(partition)的概念。分区是一个偏物理层的概念,也是 RDD 并行计算的核心。数据在 RDD 内部被切分为多个子集合,每个子集合可以被认为是一个分区,运算逻辑最小会被应用在每一个分区上,每个分区是由一个单独的任务(task)来运行的, 所以分区数越多,整个应用的并行度也会越高。

获取RDD分区数目两种方式:

在这里插入图片描述

下回分解

下篇就要开始介绍RDD的函数了,正式开始使用RDD的转换。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号