赞
踩
高速处理:Spark利用内存计算和优化的执行计划,能够实现比传统MapReduce更高效的数据处理。
容错性:Spark提供了弹性分布式数据集(RDD)作为基本抽象,通过RDD的容错性和恢复机制,保证了数据处理的稳定性。
多语言支持:Spark支持Scala、Java、Python和R等多种编程语言,使得开发者能够使用自己熟悉的语言进行开发。
生态系统
Spark Core:Spark核心,提供底层框架和核心支持。
BlinkDB:一个用于在海量数据上运行交互式SQL查询的大规模并行查询引擎,它允许用户通过权衡数据精度来提升查询响应时间,其数据的精度被控制在允许的误差范围内。
SparkSQL: 可以执行SQL查询,包括基本的SQL语法和HiveQL语法。读取的数据源包括Hive表、Parquent文件、JSON数据、关系数据库(如MYSQL等)。
Spark Streaming:流式计算。比如,一个网站的流量是每时每刻都在发生的,如果需要知道过去15分钟或一个小时的流量,则可以使用Spark Streaming来解决这个问题。
MLBase:MLbase是Spark生态圈的一部分,专注于机器学习,让机器学习的门槛更低,让一些可能并不了解机器学习的用户也能方便地使用MLbase。
Mlib:MLbase的一部分,MLlib是Spark的数据挖掘算法库,实现了一些常见的机器学习算法和实用程序。
GraphX:图计算的应用在很多情况下处理的数据都是很庞大的,比如在移动社交上面的关系等都可以用图相关算法来进行处理和挖掘,但是如果用户要自行编写相关的图计算算法,并且要在集群中应用,那么难度是非常大的。而使用Spark GraphX就可以解决这个问题,它里面内置了很多的图相关算法。
RDD(Resilient Distributed Dataset):弹性分布式数据集,是Spark中的基本数据抽象,代表了分布在集群中的不可变数据集合。
分区(Partition):RDD中的数据会被分割成多个分区,每个分区存储在集群的不同节点上进行并行处理。
Spark应用:Spark应用是由Spark程序组成的,通常包括一个驱动程序(Driver Program)和多个执行器(Executors),驱动程序负责将任务分发给执行器进行处理。
作业(Job):作业是Spark中最高层次的执行单元,通常对应一个动作(Action)操作。
惰性求值(Lazy Evaluation):Spark中的转换操作并不会立即执行,而是等到遇到动作操作时才会触发实际的计算。
RDD窄依赖(Narrow Dependency)和宽依赖(Wide Dependency):RDD之间的依赖关系分为窄依赖和宽依赖,窄依赖表示每个父RDD分区最多只被一个子RDD分区使用,宽依赖表示每个父RDD分区可能被多个子RDD分区使用。
RDD持久化(RDD Persistence):RDD持久化可以将RDD缓存到内存或磁盘中,避免重复计算,提高计算性能。
- (1)
- val arr = Array(1,2,3,4,5,6)
- val rdd = sc.parallelize(arr)
- val sum = rdd.reduce(_ + _)
- println(sum)
- (2)
- val x = sc.parallelize(List(1,2,3,4))
- val x = x
- .map(x => x * x)
- print(x.collect().mkString(";"))
- (3)
- val x = sc.parallelize(
- List("101 張三 95","102 李四 90","103 王五 96")
- )
- val y = x.map{ x => var t = x.split(" "); (t(0),t(1),t(2).toInt) }
- y.filter(_._3 >= 95).map{x => (x._1,x._2)}.collect().foreach(println)
- (4)
- val x = sc.parallelize(1 to 9,3).map(x => (x % 3,x)).mapValues( v => v.toDouble)
- val y = x.combineByKey(
- (v:Double) => (v,1),
- (c:(Double,Int),v:Double) => (c._1 + v, c._2 + 1),
- (c1:(Double,Int),c2:(Double,Int)) => (c1._1 + c2._1,c1._2 + c2._2)
- )
- y.collect().foreach(print)
- (5)
- val goods = sc.parallelize(List("radio 30 50", "soap 3 60", "cup 6 50", "bowl 4 80"))
- val goodsTup = goods.map { x => val splits = x.split(" "); (splits(0), splits(1).toDouble, splits(2).toInt) }
- goodsTup.sortBy(_._1).collect().foreach(println)
- goodsTup.sortBy(x => x._2, false).collect.foreach(println)
val arr = Array(1,2,3,4,5,6)
val rdd = sc.parallelize(arr)
val sum = rdd.reduce(_ + _) println(sum)
创建一个包含1到6的数组,将数组转换为RDD,对RDD执行reduce操作求和,并打印结果。
运行结果:21
val x = sc.parallelize(List(1,2,3,4))
val x = x .map(x => x * x)
print(x.collect().mkString(";"))
创建一个包含1到4的RDD,对RDD执行平方操作,然后将RDD中的元素收集并用分号连接后打印。
运行结果:1;4;9;16
val x = sc.parallelize( List("101 張三 95","102 李四 90","103 王五 96") )
val y = x.map{ x => var t = x.split(" "); (t(0),t(1),t(2).toInt) }
y.filter(_._3 >= 95).map{x => (x._1,x._2)}.collect().foreach(println)
创建一个包含学生信息的RDD,将每个学生信息拆分为学号、姓名和成绩,然后过滤出成绩大于等于95的学生,并打印他们的学号和姓名。
运行结果: (101,張三) (103,王五)
val x = sc.parallelize(1 to 9,3).map(x => (x % 3,x)).mapValues( v => v.toDouble)
val y = x.combineByKey( (v:Double) => (v,1), (c:(Double,Int),v:Double) => (c._1 + v, c._2 + 1), (c1:(Double,Int),c2:(Double,Int)) => (c1._1 + c2._1,c1._2 + c2._2) )
y.collect().foreach(print)
创建一个包含1到9的RDD,并按照分区数为3进行分区,然后对RDD中的元素进行取模操作并转换为元组,接着执行combineByKey操作,对具有相同键的值进行聚合操作,最后打印结果。
运行结果:(18.0,6)(15.0,6)(12.0,6)
val goods = sc.parallelize(List("radio 30 50", "soap 3 60", "cup 6 50", "bowl 4 80"))
val goodsTup = goods.map { x => val splits = x.split(" "); (splits(0), splits(1).toDouble, splits(2).toInt) }
goodsTup.sortBy(_._1).collect().foreach(println)
goodsTup.sortBy(x => x._2, false).collect.foreach(println)
创建一个包含商品信息的RDD,将每个商品信息拆分为名称、价格和库存,并分别按名称和价格排序后打印。
运行结果: 按名称排序后: (bowl,4.0,80) (cup,6.0,50) (radio,30.0,50) (soap,3.0,60) 按价格降序排序后: (radio,30.0,50) (soap,3.0,60) (bowl,4.0,80) (cup,6.0,50)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。