赞
踩
RDD: 弹性分布式数据集(Resilient Distributed Datasets)
核心概念:Spark的核心数据抽象。
通过对RDD的理解和使用,可以在分布式计算环境中高效地处理和计算大规模数据
DAG(有向无环图):反映了RDD之间的依赖关系。
Stage:RDD和DAG是Spark提供的核心抽象,RDD的操作会生成DAG,DAG会进一步被划分为多个Stage,每个Stage包含多个Task。
这里用的是scala语言的maven项目
<!-- 导入 spark-core jar 包 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
// 引入 Spark 库
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
setMaster (设置运行模式) 方法的可选方案:
local
: 在单核上运行local[N]
: 在指定数量的 N 个核上运行,如 “local[4]”local[*]
: 使用所有可用的核spark://HOST:PORT
: 连接到指定的 Spark standalone clusteryarn
: 连接到 YARN 集群mesos://HOST:PORT
: 连接到 Mesos 集群
val conf = new SparkConf()
.setAppName("Spark RDD Example")// 设置应用程序名称
.setMaster("local[*]") // 设置运行模式
val sc = new SparkContext(conf)
// sc.setLogLevel() // 设置日志显示级别
从集合创建 RDD,指定分区数
val rdd: RDD[T] = sc.parallelize(seq: Seq[T], numSlices: Int) // ✔
val rdd: RDD[T] = sc.makeRDD(seq: Seq[T], numSlices: Int) // 调用了 parallelize
从外部数据源创建 RDD,指定最小分区数
从文件系统中的单个文件创建 RDD
- 本地文件系统使用
file:///
前缀- Hadoop 文件系统使用
hdfs://
前缀
// 从文件系统创建 RDD,可以通过 minPartitions 指定分区数
val textRDD: RDD[String] = sc.textFile(filePath, minPartitions:Int) // 从文件系统创建 RDD
val rdd: RDD[(String, String)] = sc.wholeTextFiles(dir: String, minPartitions: Int) // 从目录创建 RDD
附加单词次数统计
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD object SparkRDDExample { def main(args: Array[String]): Unit = { // 配置 Spark val conf = new SparkConf().setAppName("Spark RDD Example").setMaster("local[*]") val sc = new SparkContext(conf) // 从集合创建 RDD,指定分区数 val data = Seq(1, 2, 3, 4, 5) val rdd: RDD[Int] = sc.parallelize(data, numSlices = 2) rdd.collect().foreach(println) // 从外部数据源创建 RDD,指定最小分区数 val filePath = "file:///F:\\sparkRDD\\spark01\\data\\story.txt" val textRDD: RDD[String] = sc.textFile(filePath, minPartitions = 4) textRDD.collect().foreach(println) // 将文本文件中的每行数据拆分为单词并统计每个单词的出现次数 val wordCountRDD = textRDD.mapPartitions { _.flatMap { _.split("[^a-zA-Z]+") // 按非字母字符拆分字符串 .map(word => (word, 1)) // 将每个单词转换为 (单词, 1) 的元组 } } .reduceByKey(_+_) // 显示单词计数结果 println("Word count from textFile:") wordCountRDD.collect().foreach(println) // 停止 SparkContext sc.stop() } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。