赞
踩
def first(): T
first返回RDD中的第一个元素,不排序。
scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21
scala> rdd1.first
res14: (String, String) = (A,1)
scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :21
scala> rdd1.first
res8: Int = 10
def count(): Long
count返回RDD中的元素数量。
scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at :21
scala> rdd1.count
res15: Long = 3
def reduce(f: (T, T) ⇒ T): T
根据映射函数f,对RDD中的元素进行二元计算,返回计算结果。
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21
scala> rdd1.reduce(_ + _)
res18: Int = 55
scala> var rdd2 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at makeRDD at :21
scala> rdd2.reduce((x,y) => {
| (x._1 + y._1,x._2 + y._2)
| })
res21: (String, Int) = (CBBAA,6)
def collect(): Array[T]
collect用于将一个RDD转换成数组。
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21
scala> rdd1.collect
res23: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
def take(num: Int): Array[T]
take用于获取RDD中从0到num-1下标的元素,不排序。
scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21
scala> rdd1.take(1)
res0: Array[Int] = Array(10)
scala> rdd1.take(2)
res1: Array[Int] = Array(10, 4)
def top(num: Int)(implicit ord: Ordering[T]): Array[T]
top函数用于从RDD中,按照默认(降序)或者指定的排序规则,返回前num个元素。
scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3)) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21 scala> rdd1.top(1) res2: Array[Int] = Array(12) scala> rdd1.top(2) res3: Array[Int] = Array(12, 10) //指定排序规则 scala> implicit val myOrd = implicitly[Ordering[Int]].reverse myOrd: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@767499ef scala> rdd1.top(1) res4: Array[Int] = Array(2) scala> rdd1.top(2) res5: Array[Int] = Array(2, 3)
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
takeOrdered和top类似,只不过以和top相反的顺序返回元素。
scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21
scala> rdd1.top(1)
res4: Array[Int] = Array(12)
scala> rdd1.top(2)
res5: Array[Int] = Array(12, 10)
scala> rdd1.takeOrdered(1)
res6: Array[Int] = Array(2)
scala> rdd1.takeOrdered(2)
res7: Array[Int] = Array(2, 3)
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U
aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U。
var rdd1 = sc.makeRDD(1 to 10,2) rdd1.mapPartitionsWithIndex{ (partIdx,iter) => { var part_map = scala.collection.mutable.Map[String,List[Int]]() while(iter.hasNext){ var part_name = "part_" + partIdx; var elem = iter.next() if(part_map.contains(part_name)) { var elems = part_map(part_name) elems ::= elem part_map(part_name) = elems } else { part_map(part_name) = List[Int]{elem} } } part_map.iterator } }.collect res16: Array[(String, List[Int])] = Array((part_0,List(5, 4, 3, 2, 1)), (part_1,List(10, 9, 8, 7, 6)))
第一个分区中包含5,4,3,2,1
第二个分区中包含10,9,8,7,6
scala> rdd1.aggregate(1)(
| {(x : Int,y : Int) => x + y},
| {(a : Int,b : Int) => a + b}
| )
res17: Int = 58
结果为什么是58,看下面的计算过程:
先在每个分区中迭代执行 (x : Int,y : Int) => x + y 并且使用zeroValue的值1
即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16
part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41
再将两个分区的结果合并(a : Int,b : Int) => a + b ,并且使用zeroValue的值1
即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58
再比如:
scala> rdd1.aggregate(2)(
| {(x : Int,y : Int) => x + y},
| {(a : Int,b : Int) => a * b}
| )
res18: Int = 1428
这次zeroValue=2
part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17
part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42
最后:zeroValuepart_0part_1 = 2 * 17 * 42 = 1428
因此,zeroValue即确定了U的类型,也会对结果产生至关重要的影响,使用时候要特别注意。
def fold(zeroValue: T)(op: (T, T) ⇒ T): T
fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。
var rdd1 = sc.makeRDD(1 to 10, 2)
scala> rdd1.fold(1)(
| (x,y) => x + y
| )
res19: Int = 58
##结果同上面使用aggregate的第一个例子一样,即:
scala> rdd1.aggregate(1)(
| {(x,y) => x + y},
| {(a,b) => a + b}
| )
res20: Int = 58
def lookup(key: K): Seq[V]
lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值。
scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21
scala> rdd1.lookup("A")
res0: Seq[Int] = WrappedArray(0, 2)
scala> rdd1.lookup("B")
res1: Seq[Int] = WrappedArray(1, 2)
def countByKey(): Map[K, Long]
countByKey用于统计RDD[K,V]中每个K的数量。
scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("B",3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at :21
scala> rdd1.countByKey
res5: scala.collection.Map[String,Long] = Map(A -> 2, B -> 3)
def foreach(f: (T) ⇒ Unit): Unit
foreach用于遍历RDD,将函数f应用于每一个元素。
但要注意,如果对RDD执行foreach,只会在Executor端有效,而并不是Driver端。
比如:rdd.foreach(println),只会在Executor的stdout中打印出来,Driver端是看不到的。
在Spark1.4中是这样,不知道是否真如此。
这时候,使用accumulator共享变量与foreach结合,倒是个不错的选择。
scala> var cnt = sc.accumulator(0)
cnt: org.apache.spark.Accumulator[Int] = 0
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21
scala> rdd1.foreach(x => cnt += x)
scala> cnt.value
res51: Int = 55
scala> rdd1.collect.foreach(println)
def foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit
foreachPartition和foreach类似,只不过是对每一个分区使用f。
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21
scala> var allsize = sc.accumulator(0)
size: org.apache.spark.Accumulator[Int] = 0
scala> rdd1.foreachPartition { x => {
| allsize += x.size
| }}
scala> println(allsize.value)
10
def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
sortBy根据给定的排序k函数将RDD中的元素进行排序。
scala> var rdd1 = sc.makeRDD(Seq(3,6,7,1,2,0),2) scala> rdd1.sortBy(x => x).collect res1: Array[Int] = Array(0, 1, 2, 3, 6, 7) //默认升序 scala> rdd1.sortBy(x => x,false).collect res2: Array[Int] = Array(7, 6, 3, 2, 1, 0) //降序 //RDD[K,V]类型 scala>var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7))) scala> rdd1.sortBy(x => x).collect res3: Array[(String, Int)] = Array((A,1), (A,2), (B,3), (B,6), (B,7)) //按照V进行降序排序 scala> rdd1.sortBy(x => x._2,false).collect res4: Array[(String, Int)] = Array((B,7), (B,6), (B,3), (A,2), (A,1))
def saveAsTextFile(path: String): Unit
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中。
codec参数可以指定压缩的类名。
var rdd1 = sc.makeRDD(1 to 10,2)
scala> rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/") //保存到HDFS
hadoop fs -ls /tmp/lxw1234.com
Found 2 items
-rw-r--r-- 2 lxw1234 supergroup 0 2015-07-10 09:15 /tmp/lxw1234.com/_SUCCESS
-rw-r--r-- 2 lxw1234 supergroup 21 2015-07-10 09:15 /tmp/lxw1234.com/part-00000
hadoop fs -cat /tmp/lxw1234.com/part-00000
注意:如果使用rdd1.saveAsTextFile(“file:///tmp/lxw1234.com”)将文件保存到本地文件系统,那么只会保存在Executor所在机器的本地目录。
//指定压缩格式保存
rdd1.saveAsTextFile(“hdfs://cdh5/tmp/lxw1234.com/”,classOf[com.hadoop.compression.lzo.LzopCodec])
hadoop fs -ls /tmp/lxw1234.com
-rw-r--r-- 2 lxw1234 supergroup 0 2015-07-10 09:20 /tmp/lxw1234.com/_SUCCESS
-rw-r--r-- 2 lxw1234 supergroup 71 2015-07-10 09:20 /tmp/lxw1234.com/part-00000.lzo
hadoop fs -text /tmp/lxw1234.com/part-00000.lzo
saveAsSequenceFile用于将RDD以SequenceFile的文件格式保存到HDFS上。
用法同saveAsTextFile。
def saveAsObjectFile(path: String): Unit
saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。
对于HDFS,默认采用SequenceFile保存。
var rdd1 = sc.makeRDD(1 to 10,2)
rdd1.saveAsObjectFile("hdfs://cdh5/tmp/lxw1234.com/")
hadoop fs -cat /tmp/lxw1234.com/part-00000
SEQ !org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableT
def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]): Unit
def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = …, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
saveAsHadoopFile是将RDD存储在HDFS上的文件中,支持老版本Hadoop API。
可以指定outputKeyClass、outputValueClass以及压缩格式。
每个分区输出一个文件。
var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]],
classOf[com.hadoop.compression.lzo.LzopCodec])
def saveAsHadoopDataset(conf: JobConf): Unit
saveAsHadoopDataset用于将RDD保存到除了HDFS的其他存储中,比如HBase。
在JobConf中,通常需要关注或者设置五个参数:
文件的保存路径、key值的class类型、value值的class类型、RDD的输出格式(OutputFormat)、以及压缩相关的参数。
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import SparkContext._ import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.mapred.JobConf var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7))) var jobConf = new JobConf() jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]]) jobConf.setOutputKeyClass(classOf[Text]) jobConf.setOutputValueClass(classOf[IntWritable]) jobConf.set("mapred.output.dir","/tmp/lxw1234/") rdd1.saveAsHadoopDataset(jobConf)
结果:
hadoop fs -cat /tmp/lxw1234/part-00000
A 2
A 1
hadoop fs -cat /tmp/lxw1234/part-00001
B 6
B 3
B 7
HBase建表:
create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import SparkContext._ import org.apache.hadoop.mapred.TextOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.io.ImmutableBytesWritable var conf = HBaseConfiguration.create() var jobConf = new JobConf(conf) jobConf.set("hbase.zookeeper.quorum","zkNode1,zkNode2,zkNode3") jobConf.set("zookeeper.znode.parent","/hbase") jobConf.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234") jobConf.setOutputFormat(classOf[TableOutputFormat]) var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7))) rdd1.map(x => { var put = new Put(Bytes.toBytes(x._1)) put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2)) (new ImmutableBytesWritable,put) } ).saveAsHadoopDataset(jobConf)
hbase(main):005:0> scan 'lxw1234'
ROW COLUMN+CELL
A column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x02
B column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x06
C column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x07
3 row(s) in 0.0550 seconds
注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。
def saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit
def saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): Unit
saveAsNewAPIHadoopFile用于将RDD数据保存到HDFS上,使用新版本Hadoop API。
用法基本同saveAsHadoopFile。
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
rdd1.saveAsNewAPIHadoopFile("/tmp/lxw1234/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
def saveAsNewAPIHadoopDataset(conf: Configuration): Unit
作用同saveAsHadoopDataset,只不过采用新版本Hadoop API。
以写入HBase为例:
HBase建表:
create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}
完整的Spark应用程序:
package com.lxw1234.test import org.apache.spark.SparkConf import org.apache.spark.SparkContext import SparkContext._ import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.client.Put object Test { def main(args : Array[String]) { val sparkConf = new SparkConf().setMaster("spark://lxw1234.com:7077").setAppName("lxw1234.com") val sc = new SparkContext(sparkConf); var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7))) sc.hadoopConfiguration.set("hbase.zookeeper.quorum ","zkNode1,zkNode2,zkNode3") sc.hadoopConfiguration.set("zookeeper.znode.parent","/hbase") sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234") var job = new Job(sc.hadoopConfiguration) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) rdd1.map( x => { var put = new Put(Bytes.toBytes(x._1)) put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2)) (new ImmutableBytesWritable,put) } ).saveAsNewAPIHadoopDataset(job.getConfiguration) sc.stop() } }
注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。