赞
踩
Rdd的支持两种类型的算子操作,一类是Transformations,一类是Action算子。
Transformations算子又叫转换算子,是基于一个已存在的RDD做转换处理后生成一个新的RDD。
也就是说当代码运行到Transformations算子时并没有立即发生计算产生结果,而仅仅是记录了这一计算的规则,等到后续有Action算子来触发时才发生真正的计算处理。
Action算子也叫行动算子,如foreach、collect、count等。Action类算子是触发执行,一个application应用程序中有几个action类算子执行,就有几个job运行。
说明:对源RDD中的每一个元素,作为map函数的输入进行计算处理生成一个新的RDD,一个输入对应一个输出
import org.apache.spark.api.java.function.{MapFunction}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/*
* 自定义mapFunction函数类,实现MapFunction接口
*
* */
class MyMapFunctionClass extends MapFunction[String,SensorReading]{
override def call(value: String): SensorReading = {
val arr = value.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
}
}
case class SensorReading(id:String,timestamp:BigInt,temperature:Double)
object RddTest {
/*
* 自定义map函数,函数的输入是源RDD中的一个元素,返回值是对元素处理过后对象
* */
def myMapFunction(value:String):SensorReading={
val arr = value.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
}
def main(args: Array[String]): Unit = {
//1.创建sparkConf
val sparkConf:SparkConf = new SparkConf()
sparkConf.setAppName("RddTest")
sparkConf.setMaster("local")
//2.创建SparkContext
val sparkContext:SparkContext = new SparkContext(sparkConf)
//3.从文件中创建数据集
val inpath:String = "D:\\javaworkspace\\BigData\\Spark\\SparkApp\\src\\main\\resources\\sensor.txt"
val rdd1:RDD[String] = sparkContext.textFile(inpath,2)
//4.使用map算子转换为RDD[Sensor]
//4.1 使用匿名函数的方式
val rdd3:RDD[SensorReading] = rdd1.map(data=>{
val arr = data.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
})
rdd3.foreach(data=>{
println("匿名函数:"+data)
})
//4.2 使用自定义函数的方式实现
val rdd4:RDD[SensorReading] = rdd1.map(myMapFunction);
rdd4.foreach(data=>{
println("自定义函数:"+data)
})
//4.3 使用自定义函数类的实现
val rdd5:RDD[SensorReading] = rdd1.map(new MyMapFunctionClass().call)
rdd5.foreach(data=>{
println("自定义函数类:"+data)
})
//4.关闭sparkContext
sparkContext.stop()
}
}
说明:对源RDD中的每一个元素,作为map函数的输入进行计算处理生成一个新的RDD,一个输入对应一个或者多个输出
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
case class SensorReading(id:String,timestamp:BigInt,temperature:Double)
object RddTest {
/*
* 自定义flatMap函数,函数的输入是源RDD中的一个元素,返回值是对元素处理过后对象的一个集合
* */
def myFlatMapFunction(value:String):Array[SensorReading]={
val arr = value.split(",")
val obj = SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
Array(obj)
}
def main(args: Array[String]): Unit = {
//1.创建sparkConf
val sparkConf:SparkConf = new SparkConf()
sparkConf.setAppName("RddTest")
sparkConf.setMaster("local")
//2.创建SparkContext
val sparkContext:SparkContext = new SparkContext(sparkConf)
//3.从文件中创建数据集
val inpath:String = "D:\\javaworkspace\\BigData\\Spark\\SparkApp\\src\\main\\resources\\sensor.txt"
val rdd1:RDD[String] = sparkContext.textFile(inpath,2)
//4.使用flatMap算子转换为RDD[Sensor]
//4.1 使用匿名函数的方式,返回的是一个集合
val rdd3:RDD[SensorReading] = rdd1.flatMap(data=>{
val arr = data.split(",")
Array(SensorReading(arr(0),arr(1).toLong,arr(2).toDouble))
})
rdd3.foreach(data=>{
println("匿名函数:"+data)
})
//4.2 使用自定义函数的方式实现
val rdd4:RDD[SensorReading] = rdd1.flatMap(myFlatMapFunction);
rdd4.foreach(data=>{
println("自定义函数:"+data)
})
//4.关闭sparkContext
sparkContext.stop()
}
}
说明:过滤符合条件的记录,true保留,false过滤
import org.apache.spark.api.java.function.FilterFunction
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/*
* 自定义FilterFunct类实现,实现FilterFunction接口
*
* */
class MyFilterFunction(temperature:Double) extends FilterFunction[SensorReading]{
override def call(value: SensorReading): Boolean = {
value.temperature>=temperature
}
}
case class SensorReading(id:String,timestamp:BigInt,temperature:Double)
object RddTest {
/*
* 自定义filter函数,函数的输入是源RDD中的一个元素,返回值是boolean类型,为True的保留,False的被过滤掉
* */
def myFilter(value:SensorReading):Boolean={
value.temperature>=40
}
def main(args: Array[String]): Unit = {
//1.创建sparkConf
val sparkConf:SparkConf = new SparkConf()
sparkConf.setAppName("RddTest")
sparkConf.setMaster("local")
//2.创建SparkContext
val sparkContext:SparkContext = new SparkContext(sparkConf)
//3.从文件中创建数据集
val inpath:String = "D:\\javaworkspace\\BigData\\Spark\\SparkApp\\src\\main\\resources\\sensor.txt"
val rdd1:RDD[String] = sparkContext.textFile(inpath,2)
//4.使用flatMap算子转换为RDD[Sensor]
//4.1 使用匿名函数的方式,返回的是一个集合
val rdd3:RDD[SensorReading] = rdd1.flatMap(data=>{
val arr = data.split(",")
Array(SensorReading(arr(0),arr(1).toLong,arr(2).toDouble))
})
//5 filter过滤
//5.1 匿名函数实现
val rdd4:RDD[SensorReading] = rdd3.filter(data=>{
data.temperature>=40
})
rdd4.foreach(data=>{
println("匿名函数实现:"+data)
})
//5.1 自定义函数实现
val rdd5:RDD[SensorReading] = rdd3.filter(myFilter)
rdd5.foreach(data=>{
println("自定义函数实现:"+data)
})
//5.2 自定义函数类实现
val rdd6:RDD[SensorReading] = rdd3.filter(new MyFilterFunction(40).call)
rdd6.foreach(data=>{
println("自定义函数类实现:"+data)
})
//4.关闭sparkContext
sparkContext.stop()
}
}
说明:与map类似,只不过映射的对象不是RDD中的元素,而是RDD中的一个个partition相当于是批量处理Map的批量处理。
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
case class SensorReading(id:String,timestamp:BigInt,temperature:Double)
object RddTest {
/*
* 自定义函数实现
* */
def myMapPartitions(value:Iterator[String]):Iterator[SensorReading]={
var data_list:List[SensorReading] = List()
for(item <- value){
val arr = item.split(",")
val obj = SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
data_list = data_list :+ obj
}
data_list.iterator
}
def main(args: Array[String]): Unit = {
//1.创建sparkConf
val sparkConf:SparkConf = new SparkConf()
sparkConf.setAppName("RddTest")
sparkConf.setMaster("local")
//2.创建SparkContext
val sparkContext:SparkContext = new SparkContext(sparkConf)
//3.从文件中创建数据集
val inpath:String = "D:\\javaworkspace\\BigData\\Spark\\SparkApp\\src\\main\\resources\\sensor.txt"
val rdd1:RDD[String] = sparkContext.textFile(inpath,2)
//4.使用mapPartitons算子转换为RDD[Sensor]
//4.1 使用匿名函数的方式,返回的是一个集合
val rdd3:RDD[SensorReading] = rdd1.mapPartitions(data=>{
var data_list:List[SensorReading] = List()
while(data.hasNext){
val value:String = data.next()
val arr = value.split(",")
val obj = SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
data_list = data_list :+ obj
}
data_list.iterator
})
rdd3.foreach(data=>{
println("匿名函数实现:"+data)
})
//4.2 使用自定义函数方式实现
val rdd4:RDD[SensorReading] = rdd1.mapPartitions(myMapPartitions)
rdd3.foreach(data=>{
println("自定义函数实现:"+data)
})
//4.关闭sparkContext
sparkContext.stop()
}
}
说明:与mapPartitons类似,通过index遍历RDD中的一个个partitions进行映射处理。
val rdd4:RDD[SensorReading] = rdd1.mapPartitionsWithIndex((index,partition)=>{
var data_list:List[SensorReading] = List()
while(partition.hasNext){
val value:String = partition.next()
val arr = value.split(",")
val obj = SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
data_list = data_list :+ obj
}
data_list.iterator
})
说明:随机抽样算子,根据传进去的小数按比例进行又回放或者无回放的抽样
val rdd5:RDD[SensorReading] = rdd4.sample(true,0.2,100001)
rdd5.foreach(data=>{
println("sample:"+data)
})
sample的三个参数:
说明:合并两个RDD,RDD的元素的数据类型必须一致
val rdd6 = rdd4.union(rdd5)
说明:返回两个数据集的交集
val rdd6 = rdd4.intersection(rdd5)
说明:返回两个数据集的差集
val rdd6 = rdd4.subtract(rdd5)
说明:数据集去重
val rdd6 = rdd4.distinct()
说明:对RDD[V]类型的数据集做分组,通过指定V的某一属性进行分组,生成(K,Iterable[V])类型的RDD
val rdd2:RDD[SensorReading] = rdd1.map(data=>{
val arr = data.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
})
val rdd4 = rdd2.groupBy(data=>{
data.id
})
说明:对(K,V)类型的RDD数据集,通过Key对数据集中的元素进行分组,生成(K,Iterable[V])类型的RDD
val rdd2:RDD[SensorReading] = rdd1.map(data=>{
val arr = data.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
})
val rdd3:RDD[(String,SensorReading)] = rdd2.map(data=>{
(data.id,data)
})
val rdd4 = rdd3.groupByKey()
说明:对(K,V)类型的RDD数据集,通过Key分组做reduce运算,类似于MapReduce中的reduce。
val rdd2:RDD[SensorReading] = rdd1.map(data=>{
val arr = data.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
})
val rdd3:RDD[(String,SensorReading)] = rdd2.map(data=>{
(data.id,data)
})
//找出每一組中的溫度的最大值
val rdd4:RDD[(String,SensorReading)] = rdd3.reduceByKey((value1:SensorReading,value2:SensorReading)=>{
if(value1.temperature>value2.temperature){
value1
}else{
value2
}
})
说明: 对PairRDD(K,V)形式的RDD中相同的Key值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey’函数最终返回的类型还是PairRDD,对应的结果是Key和聚合后的值,而aggregate函数直接返回的是非RDD的结果。
函数原型:
/*
* Aggregate the values of each key, using given combine functions and a neutral "zero value".
This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, as in scala.TraversableOnce.
The former operation is used for merging values within a partition, and the latter is used for merging values between partitions.
To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.
*/
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]
示例:求每一组温度的最大值
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object MyAggregateFunction{
var zeroValue:Double = 0.0
/*
* 对每一个分区中的每一条记录进行处理,传递进来的初始值是zeroValue
* */
def seqOp(acc: Double,data:SensorReading):Double={
if(data.temperature>acc) data.temperature else acc
}
/*
* 对每个分区的结果再做一次合并操作,得到最后的结果
*
* */
def comOp(data1:Double,data2:Double):Double={
if(data1>data2) data1 else data2
}
}
case class SensorReading(id:String,timestamp:BigInt,temperature:Double)
object RddTest {
def main(args: Array[String]): Unit = {
//1.创建sparkConf
val sparkConf:SparkConf = new SparkConf()
sparkConf.setAppName("RddTest")
sparkConf.setMaster("local")
//2.创建SparkContext
val sparkContext:SparkContext = new SparkContext(sparkConf)
//3.从文件中创建数据集
val inpath:String = "D:\\javaworkspace\\BigData\\Spark\\SparkApp\\src\\main\\resources\\sensor.txt"
val rdd1:RDD[String] = sparkContext.textFile(inpath,2)
val rdd2:RDD[SensorReading] = rdd1.map(data=>{
val arr = data.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
})
val rdd3:RDD[(String,SensorReading)] = rdd2.map(data=>{
(data.id,data)
})
val rdd4 = rdd3.aggregateByKey(MyAggregateFunction.zeroValue)(MyAggregateFunction.seqOp,MyAggregateFunction.comOp)
rdd4.foreach(println)
//4.关闭sparkContext
sparkContext.stop()
}
}
求平均值案例
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
class Acc(_sum:Double,_count: Long) extends Serializable {
var sum:Double = _sum
var count:Long = _count
override def toString: String = {
return "Acc("+this.sum+","+this.count+")"
}
}
object MyAggregateFunction{
var zeroValue:Acc = new Acc(0.0,0)
/*
* 对每一个分区中的每一条记录进行处理,传递进来的初始值是zeroValue
* */
def seqOp(acc: Acc,data:SensorReading):Acc={
acc.count = acc.count+1
acc.sum += data.temperature
acc
}
/*
* 对每个分区的结果再做一次合并操作,得到最后的结果
*
* */
def comOp(data1:Acc,data2:Acc):Acc={
println("data1:"+data1.count+","+data1.sum)
println("data2:"+data2.count+","+data2.sum)
data1.sum+=data2.sum
data1.count+=data2.count
data1
}
}
case class SensorReading(id:String,timestamp:BigInt,temperature:Double)
object RddTest {
def main(args: Array[String]): Unit = {
//1.创建sparkConf
val sparkConf:SparkConf = new SparkConf()
sparkConf.setAppName("RddTest")
sparkConf.setMaster("local")
//2.创建SparkContext
val sparkContext:SparkContext = new SparkContext(sparkConf)
//3.从文件中创建数据集
val inpath:String = "D:\\javaworkspace\\BigData\\Spark\\SparkApp\\src\\main\\resources\\sensor.txt"
val rdd1:RDD[String] = sparkContext.textFile(inpath,2)
val rdd2:RDD[SensorReading] = rdd1.map(data=>{
val arr = data.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
})
val rdd3:RDD[(String,SensorReading)] = rdd2.map(data=>{
(data.id,data)
})
val rdd4 = rdd3.aggregateByKey(MyAggregateFunction.zeroValue)(MyAggregateFunction.seqOp,MyAggregateFunction.comOp).map(data=>{
(data._1,data._2.sum/data._2.count)
})
rdd4.foreach(println)
//4.关闭sparkContext
sparkContext.stop()
}
}
说明:对(K,V)类型的RDD数据集通过Key对数据集进行排序
val rdd4 = rdd3.sortByKey()
说明:连接算子,类似于SQL中的join,作用在K,V格式的RDD上,根据K进行连接,对(K,V) join (K,W) 返回(K,(V,W)),同SQL中的join联表查询
//默认就是左外连接
rdd3.join(rdd4)
//左外连接
rdd3.leftOuterJoin(rdd4)
//右外连接
rdd3.rightOuterJoin(rdd4)
//全连接
rdd3.fullOuterJoin(rdd4)
说明:连接算子,作用在K,V格式的RDD上,根据K进行连接,对(K,V) cogroup (K,W) 返回(K,(Iterable,Iterable))
val rdd5 = rdd3.cogroup(rdd4)
说明:对RDD[U]和RDD[V]类型的数据集做笛卡尔积运算,返回两个RDD的笛卡尔积,生成RDD[(U,V)]类型的RDD
rdd2.cartesian(rdd2)
说明:coalesce常用来减少分区,第二个参数是减少分区的过程中是否产生shuffle。
true为产生shuffle,false不产生shuffle。默认是false。
//参数一:指定分区数
//参数二:是否产生shuffle
rdd2.coalesce(3,false)
说明:对RDD重分区
//重分区为3个partitions
val RDD3 = rdd2.repartition(3)
说明:将计算结果回收到Driver端。
val data = rdd1.collect()
println(data)
说明:返回数据集中元素的个数,会在结果计算完毕后将结果传输给Driver端。
val count = rdd1.count()
说明:返回数据集中的第一个元素,会将结果传输给Driver端。
val first = rdd4.first()
说明:take(n),返回集合中的前N个元素
val first = rdd1.take(1)
说明:对RDD[U]数据集进行抽样,返回一个Array[U]的集合,并将结果传输给Driver端。
/*
* 参数一:抽完样后是否放回数据池中,如果为true,则下次还有可能抽中该元素
* 参数二:抽取的样本数
* 参数三:抽样种子,同sample()算子,如果种子相同,每次抽取的样本数都一样。
*/
val sample = rdd4.takeSample(true,3,100000)
sample.foreach(println)
说明 取出RDD中的值从小到大最小的N个元素,可以通过自定义排序规则。
val rdd2:RDD[SensorReading] = rdd1.map(data=>{
val arr = data.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
})
/*自定义排序器,按照温度排序,取出温度最小的2个*/
val res = rdd2.takeOrdered(2)(new Ordering[SensorReading]() {
override def compare(x: SensorReading, y: SensorReading): Int = {
x.temperature.compareTo(y.temperature)
}
})
res.foreach(println)
说明:将RDD以TXT的方式保存到文件中,参数inpath填需要保存的目录即可,spark会在该目录下将文件按照分区保存。
rdd3.saveAsTextFile("./test")
说明:功能同saveAsTextFile,只是数据的序列化方式不同。
rdd3.saveAsSequenceFile("./test")
说明:功能同saveAsTextFile,只是数据的序列化方式不同。
rdd3.saveAsOjbectFile("./test")
说明:reduce需要和reduceByKey区分,reduceByKey是一个Transformation算子,而reduce是一个Action算子。reduce作用在RDD上,对RDD的元素做reduce操作,并将结果返回给Driver,返回类型同RDD元素的类型一致。
val rdd2:RDD[SensorReading] = rdd1.map(data=>{
val arr = data.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
})
/*
* 求所有传感器中的最高温度
*/
val res = rdd2.reduce((data1,data2)=>{
if(data1.temperature>data2.temperature){
data1
}else{
data2
}
})
说明:对(K,U)类型的RDD[(K,U)]通过Key分组统计每组元素的个数,返回一个MAP[K,Long]的结果,并在计算完毕后将结果传输给Driver端。
val rdd3:RDD[(String,SensorReading)] = rdd2.map(data=>{
(data.id,data)
})
val keyCount = rdd3.countByKey()
keyCount.foreach(println)
说明:遍历数据集中的每一个元素,并用指定的function对数据集中的元素进行处理
rdd.foreach(println)
rdd.foreach(data=>{
/*对传入的每一个元素进行处理,但是该方法没有返回值*/
println("foreach"+data)
})
说明:遍历数据集中的每一个partition
rdd3.foreachPartition(data=>{
while(data.hasNext){
print(data.next())
}
})
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。