赞
踩
转一个国外大牛的RDD的example:
连接地址是:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
Our research group has a very strong focus on using and improving Apache Spark to solve real world programs. In order to do this we need to have a very solid understanding of the capabilities of Spark. So one of the first things we have done is to go through the entire Spark RDD API and write examples to test their functionality. This has been a very useful exercise and we would like to share the examples with everyone.
Authors of examples: Matthias Langer and Zhen He
Emails addresses: m.langer@latrobe.edu.au, z.he@latrobe.edu.au
These examples have only been tested for Spark version 1.1. We assume the functionality of Spark is stable and therefore the examples should be valid for later releases.
The RDD API By Example
RDD is short for Resilient Distributed Dataset. RDDs are the workhorse of the Spark system. As a user, one can consider a RDD as a handle for a collection of individual data partitions, which are the result of some computation.
However, an RDD is actually more than that. On cluster installations, separate data partitions can be on separate nodes. Using the RDD as a handle one can access all partitions and perform computations and transformations using the contained data. Whenever a part of a RDD or an entire RDD is lost, the system is able to reconstruct the data of lost partitions by using lineage information. Lineage refers to the sequence of transformations used to produce the current RDD. As a result, Spark is able to recover automatically from most failures.
All RDDs available in Spark derive either directly or indirectly from the class RDD. This class comes with a large set of methods that perform operations on the data within the associated partitions. The class RDD is abstract. Whenever, one uses a RDD, one is actually using a concertized implementation of RDD. These implementations have to overwrite some core functions to make the RDD behave as expected.
One reason why Spark has lately become a very popular system for processing big data is that it does not impose restrictions regarding what data can be stored within RDD partitions. The RDD API already contains many useful operations. But, because the creators of Spark had to keep the core API of RDDs common enough to handle arbitrary data-types, many convenience functions are missing.
The basic RDD API considers each data item as a single value. However, users often want to work with key-value pairs. Therefore Spark extended the interface of RDD to provide additional functions (PairRDDFunctions), which explicitly work on key-value pairs. Currently, there are four extensions to the RDD API available in spark. They are as follows:
DoubleRDDFunctions
PairRDDFunctions
Methods defined in this interface extension become available when the data items have a two component tuple structure. Spark will interpret the first tuple item (i.e. tuplename. 1) as the key and the second item (i.e. tuplename. 2) as the associated value.
OrderedRDDFunctions
Methods defined in this interface extension become available if the data items are two-component tuples where the key is implicitly sortable.
SequenceFileRDDFunctions
This extension contains several methods that allow users to create Hadoop sequence- les from RDDs. The data items must be two compo- nent key-value tuples as required by the PairRDDFunctions. However, there are additional requirements considering the convertibility of the tuple components to Writable types.
Since Spark will make methods with extended functionality automatically available to users when the data items fulfill the above described requirements, we decided to list all possible available functions in strictly alphabetical order. We will append either of the followingto the function-name to indicate it belongs to an extension that requires the data items to conform to a certain format or type.
[Double] - Double RDD Functions
[Ordered] - OrderedRDDFunctions
[Pair] - PairRDDFunctions
[SeqFile] - SequenceFileRDDFunctions
Examples 1
val z = sc.parallelize(List(1,2,3,4,5,6), 2) z.aggregate(0)(math.max(_, _), _ + _) res40: Int = 9 val z = sc.parallelize(List("a","b","c","d","e","f"),2) z.aggregate("")(_ + _, _+_) res115: String = abcdef z.aggregate("x")(_ + _, _+_) res116: String = xxdefxabc val z = sc.parallelize(List("12","23","345","4567"),2) z.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y) res141: String = 42 z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) res142: String = 11 val z = sc.parallelize(List("12","23","345",""),2) z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) res143: String = 10 |
val z = sc.parallelize(List("12","23","","345"),2) z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y) res144: String = 11 |
val x = sc.parallelize(List(1,2,3,4,5)) val y = sc.parallelize(List(6,7,8,9,10)) x.cartesian(y).collect res0: Array[(Int, Int)] = Array((1,6), (1,7), (1,8), (1,9), (1,10), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7), (3,8), (3,9), (3,10), (4,6), (5,6), (4,7), (5,7), (4,8), (5,8), (4,9), (4,10), (5,9), (5,10)) |
sc.setCheckpointDir("my_directory_name") val a = sc.parallelize(1 to 4) a.checkpoint a.count 14/02/25 18:13:53 INFO SparkContext: Starting job: count at <console>:15 ... 14/02/25 18:13:53 INFO MemoryStore: Block broadcast_5 stored as values to memory (estimated size 115.7 KB, free 296.3 MB) 14/02/25 18:13:53 INFO RDDCheckpointData: Done checkpointing RDD 11 to file:/home/cloudera/Documents/spark-0.9.0-incubating-bin-cdh4/bin/my_directory_name/65407913-fdc6-4ec1-82c9-48a1656b95d6/rdd-11, new parent is RDD 12 res23: Long = 4 |
coalesce, repartition
val y = sc.parallelize(1 to 10, 10) val z = y.coalesce(2, false) z.partitions.length res9: Int = 2 |
cogroup [Pair], groupWith [Pair]
val a = sc.parallelize(List(1, 2, 1, 3), 1) val b = a.map((_, "b")) val c = a.map((_, "c")) b.cogroup(c).collect res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array( (2,(ArrayBuffer(b),ArrayBuffer(c))), (3,(ArrayBuffer(b),ArrayBuffer(c))), (1,(ArrayBuffer(b, b),ArrayBuffer(c, c))) ) val d = a.map((_, "d")) b.cogroup(c, d).collect res9: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array( (2,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))), (3,(ArrayBuffer(b),ArrayBuffer(c),ArrayBuffer(d))), (1,(ArrayBuffer(b, b),ArrayBuffer(c, c),ArrayBuffer(d, d))) ) val x = sc.parallelize(List((1, "apple"), (2, "banana"), (3, "orange"), (4, "kiwi")), 2) val y = sc.parallelize(List((5, "computer"), (1, "laptop"), (1, "desktop"), (4, "iPad")), 2) x.cogroup(y).collect res23: Array[(Int, (Iterable[String], Iterable[String]))] = Array( (4,(ArrayBuffer(kiwi),ArrayBuffer(iPad))), (2,(ArrayBuffer(banana),ArrayBuffer())), (3,(ArrayBuffer(orange),ArrayBuffer())), (1,(ArrayBuffer(apple),ArrayBuffer(laptop, desktop))), (5,(ArrayBuffer(),ArrayBuffer(computer)))) |
collect, toArray
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) c.collect res29: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat) |
collectAsMap [Pair]
val a = sc.parallelize(List(1, 2, 1, 3), 1) val b = a.zip(a) b.collectAsMap res1: scala.collection.Map[Int,Int] = Map(2 -> 2, 1 -> 1, 3 -> 3) |
combineByKey[Pair]
val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3) val c = b.zip(a) val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y) d.collect res16: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf))) |
compute
context, sparkContext
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2) c.context res8: org.apache.spark.SparkContext = org.apache.spark.SparkContext@58c1c2f1 |
count
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2) c.count res2: Long = 4 |
countApprox
countByKey [Pair]
val c = sc.parallelize(List((3, "Gnu"), (3, "Yak"), (5, "Mouse"), (3, "Dog")), 2) c.countByKey res3: scala.collection.Map[Int,Long] = Map(3 -> 3, 5 -> 1) |
countByKeyApprox [Pair]
val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1)) b.countByValue res27: scala.collection.Map[Int,Long] = Map(5 -> 1, 8 -> 1, 3 -> 1, 6 -> 1, 1 -> 6, 2 -> 3, 4 -> 2, 7 -> 1) |
countByValueApprox
val a = sc.parallelize(1 to 10000, 20) val b = a++a++a++a++a b.countApproxDistinct(0.1) res14: Long = 8224 b.countApproxDistinct(0.05) res15: Long = 9750 b.countApproxDistinct(0.01) res16: Long = 9947 b.countApproxDistinct(0.001) res0: Long = 10000 |
val a = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2) val b = sc.parallelize(a.takeSample(true, 10000, 0), 20) val c = sc.parallelize(1 to b.count().toInt, 20) val d = b.zip(c) d.countApproxDistinctByKey(0.1).collect res15: Array[(String, Long)] = Array((Rat,2567), (Cat,3357), (Dog,2414), (Gnu,2494)) d.countApproxDistinctByKey(0.01).collect res16: Array[(String, Long)] = Array((Rat,2555), (Cat,2455), (Dog,2425), (Gnu,2513)) d.countApproxDistinctByKey(0.001).collect res0: Array[(String, Long)] = Array((Rat,2562), (Cat,2464), (Dog,2451), (Gnu,2521)) |
val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1)) b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:12 b.dependencies.length Int = 0 b.map(a => a).dependencies.length res40: Int = 1 b.cartesian(a).dependencies.length res41: Int = 2 b.cartesian(a).dependencies res42: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.rdd.CartesianRDD |
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) c.distinct.collect res6: Array[String] = Array(Dog, Gnu, Cat, Rat) val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10)) a.distinct(2).partitions.length res16: Int = 2 a.distinct(3).partitions.length res17: Int = 3 |
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2) c.first res1: String = Gnu |
val a = sc.parallelize(1 to 10, 3) a.filter(_ % 2 == 0) b.collect res3: Array[Int] = Array(2, 4, 6, 8, 10) |
val b = sc.parallelize(1 to 8) b.filter(_ < 4).collect res15: Array[Int] = Array(1, 2, 3) val a = sc.parallelize(List("cat", "horse", 4.0, 3.5, 2, "dog")) a.filter(_ < 4).collect <console>:15: error: value < is not a member of Any |
val a = sc.parallelize(List("cat", "horse", 4.0, 3.5, 2, "dog")) a.collect({case a: Int => "is integer" | case b: String => "is string" }).collect res17: Array[String] = Array(is string, is string, is integer, is string) val myfunc: PartialFunction[Any, Any] = { case a: Int => "is integer" | case b: String => "is string" } myfunc.isDefinedAt("") res21: Boolean = true myfunc.isDefinedAt(1) res22: Boolean = true myfunc.isDefinedAt(1.5) res23: Boolean = false |
val myfunc2: PartialFunction[Any, Any] = {case x if (x < 4) => "x"} <console>:10: error: value < is not a member of Any val myfunc2: PartialFunction[Int, Any] = {case x if (x < 4) => "x"} myfunc2: PartialFunction[Int,Any] = <function1> |
val a = sc.parallelize(1 to 9, 3) val b = a.filterWith(i => i)((x,i) => x % 2 == 0 || i % 2 == 0) b.collect res37: Array[Int] = Array(1, 2, 3, 4, 6, 7, 8, 9) val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 5) a.filterWith(x=> x)((a, b) => b == 0).collect res30: Array[Int] = Array(1, 2) a.filterWith(x=> x)((a, b) => a % (b+1) == 0).collect res33: Array[Int] = Array(1, 2, 4, 6, 8, 10) a.filterWith(x=> x.toString)((a, b) => b == "2").collect res34: Array[Int] = Array(5, 6) |
val a = sc.parallelize(1 to 10, 5) a.flatMap(1 to _).collect res47: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect res85: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3) // The program below generates a random number of copies (up to 10) of the items in the list. val x = sc.parallelize(1 to 10, 3) x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) |
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.flatMapValues("x" + _ + "x").collect res6: Array[(Int, Char)] = Array((3,x), (3,d), (3,o), (3,g), (3,x), (5,x), (5,t), (5,i), (5,g), (5,e), (5,r), (5,x), (4,x), (4,l), (4,i), (4,o), (4,n), (4,x), (3,x), (3,c), (3,a), (3,t), (3,x), (7,x), (7,p), (7,a), (7,n), (7,t), (7,h), (7,e), (7,r), (7,x), (5,x), (5,e), (5,a), (5,g), (5,l), (5,e), (5,x)) |
val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3) a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect res58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2, 8, 2, 9) |
val a = sc.parallelize(List(1,2,3), 3) a.fold(0)(_ + _) res59: Int = 6 |
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2) val b = a.map(x => (x.length, x)) b.foldByKey("")(_ + _).collect res84: Array[(Int, String)] = Array((3,dogcatowlgnuant) val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.foldByKey("")(_ + _).collect res85: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle)) |
val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3) c.foreach(x => println(x + "s are yummy")) lions are yummy gnus are yummy crocodiles are yummy ants are yummy whales are yummy dolphins are yummy spiders are yummy |
val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3) b.foreachPartition(x => println(x.reduce(_ + _))) 6 15 24 |
val a = sc.parallelize(1 to 9, 3) a.foreachWith(i => i)((x,i) => if (x % 2 == 1 && i % 2 == 0) println(x) ) 1 3 7 9 |
sc.setCheckpointDir("/home/cloudera/Documents") val a = sc.parallelize(1 to 500, 5) val b = a++a++a++a++a b.getCheckpointFile res49: Option[String] = None b.checkpoint b.getCheckpointFile res54: Option[String] = None b.collect b.getCheckpointFile res57: Option[String] = Some(file:/home/cloudera/Documents/cb978ffb-a346-4820-b3ba-d56580787b20/rdd-40) |
val a = sc.parallelize(1 to 100000, 2) a.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY) a.getStorageLevel.description String = Disk Serialized 1x Replicated a.cache java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level |
val a = sc.parallelize(1 to 100, 3) a.glom.collect res8: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33), Array(34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66), Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)) |
val a = sc.parallelize(1 to 9, 3) a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect res42: Array[(String, Seq[Int])] = Array((even,ArrayBuffer(2, 4, 6, 8)), (odd,ArrayBuffer(1, 3, 5, 7, 9))) val a = sc.parallelize(1 to 9, 3) def myfunc(a: Int) : Int = { a % 2 } a.groupBy(myfunc).collect res3: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9))) val a = sc.parallelize(1 to 9, 3) def myfunc(a: Int) : Int = { a % 2 } a.groupBy(x => myfunc(x), 3).collect a.groupBy(myfunc(_), 1).collect res7: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9))) import org.apache.spark.Partitioner class MyPartitioner extends Partitioner { def numPartitions: Int = 2 def getPartition(key: Any): Int = { key match { case null => 0 case key: Int => key % numPartitions case _ => key.hashCode % numPartitions } } override def equals(other: Any): Boolean = { other match { case h: MyPartitioner => true case _ => false } } } val a = sc.parallelize(1 to 9, 3) val p = new MyPartitioner() val b = a.groupBy((x:Int) => { x }, p) val c = b.mapWith(i => i)((a, b) => (b, a)) c.collect res42: Array[(Int, (Int, Seq[Int]))] = Array((0,(4,ArrayBuffer(4))), (0,(2,ArrayBuffer(2))), (0,(6,ArrayBuffer(6))), (0,(8,ArrayBuffer(8))), (1,(9,ArrayBuffer(9))), (1,(3,ArrayBuffer(3))), (1,(1,ArrayBuffer(1))), (1,(7,ArrayBuffer(7))), (1,(5,ArrayBuffer(5)))) |
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2) val b = a.keyBy(_.length) b.groupByKey.collect res11: Array[(Int, Seq[String])] = Array((4,ArrayBuffer(lion)), (6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), (5,ArrayBuffer(tiger, eagle))) |
val a = sc.parallelize(List(1.1, 1.2, 1.3, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 9.0), 3) a.histogram(5) res11: (Array[Double], Array[Long]) = (Array(1.1, 2.68, 4.26, 5.84, 7.42, 9.0),Array(5, 0, 0, 1, 4)) val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3) a.histogram(6) res18: (Array[Double], Array[Long]) = (Array(1.0, 2.5, 4.0, 5.5, 7.0, 8.5, 10.0),Array(6, 0, 1, 1, 3, 4)) |
val a = sc.parallelize(List(1.1, 1.2, 1.3, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 9.0), 3) a.histogram(Array(0.0, 3.0, 8.0)) res14: Array[Long] = Array(5, 3) val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3) a.histogram(Array(0.0, 5.0, 10.0)) res1: Array[Long] = Array(6, 9) a.histogram(Array(0.0, 5.0, 10.0, 15.0)) res1: Array[Long] = Array(6, 8, 1) |
val y = sc.parallelize(1 to 10, 10) y.id res16: Int = 19 |
val x = sc.parallelize(1 to 20) val y = sc.parallelize(10 to 30) val z = x.intersection(y) z.collect res74: Array[Int] = Array(16, 12, 20, 13, 17, 14, 18, 10, 19, 15, 11) |
sc.setCheckpointDir("/home/cloudera/Documents") c.isCheckpointed res6: Boolean = false c.checkpoint c.isCheckpointed res8: Boolean = false c.collect c.isCheckpointed res9: Boolean = true |
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.keyBy(_.length) val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val d = c.keyBy(_.length) b.join(d).collect res17: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(rabbit,salmon)), (6,(rabbit,rabbit)), (6,(rabbit,turkey)), (6,(turkey,salmon)), (6,(turkey,rabbit)), (6,(turkey,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(cat,dog)), (3,(cat,cat)), (3,(cat,gnu)), (3,(cat,bee)), (3,(gnu,dog)), (3,(gnu,cat)), (3,(gnu,gnu)), (3,(gnu,bee)), (3,(bee,dog)), (3,(bee,cat)), (3,(bee,gnu)), (3,(bee,bee)), (4,(wolf,wolf)), (4,(wolf,bear)), (4,(bear,wolf)), (4,(bear,bear))) |
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.keyBy(_.length) b.collect res26: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant)) |
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.keys.collect res2: Array[Int] = Array(3, 5, 4, 3, 7, 5) |
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.keyBy(_.length) val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val d = c.keyBy(_.length) b.leftOuterJoin(d).collect res1: Array[(Int, (String, Option[String]))] = Array((6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (3,(dog,Some(dog))), (3,(dog,Some(cat))), (3,(dog,Some(gnu))), (3,(dog,Some(bee))), (3,(rat,Some(dog))), (3,(rat,Some(cat))), (3,(rat,Some(gnu))), (3,(rat,Some(bee))), (8,(elephant,None))) |
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.lookup(5) res0: Seq[String] = WrappedArray(tiger, eagle) |
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.map(_.length) val c = a.zip(b) c.collect res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8)) |
val a = sc.parallelize(1 to 9, 3) def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { var res = List[(T, T)]() var pre = iter.next while (iter.hasNext) { val cur = iter.next; res .::= (pre, cur) pre = cur; } res.iterator } a.mapPartitions(myfunc).collect res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8)) |
val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9,10), 3) def myfunc(iter: Iterator[Int]) : Iterator[Int] = { var res = List[Int]() while (iter.hasNext) { val cur = iter.next; res = res ::: List.fill(scala.util.Random.nextInt(10))(cur) } res.iterator } x.mapPartitions(myfunc).collect // some of the number are not outputted at all. This is because the random number generated for it is zero. res8: Array[Int] = Array(1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 5, 7, 7, 7, 9, 9, 10) |
val x = sc.parallelize(1 to 10, 3) x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect res1: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10) |
val a = sc.parallelize(1 to 9, 3) import org.apache.spark.TaskContext def myfunc(tc: TaskContext, iter: Iterator[Int]) : Iterator[Int] = { tc.addOnCompleteCallback(() => println( "Partition: " + tc.partitionId + ", AttemptID: " + tc.attemptId )) iter.toList.filter(_ % 2 == 0).iterator } a.mapPartitionsWithContext(myfunc).collect 14/04/01 23:05:48 INFO SparkContext: Starting job: collect at <console>:20 ... 14/04/01 23:05:48 INFO Executor: Running task ID 0 Partition: 0, AttemptID: 0, Interrupted: false ... 14/04/01 23:05:48 INFO Executor: Running task ID 1 14/04/01 23:05:48 INFO TaskSetManager: Finished TID 0 in 470 ms on localhost (progress: 0/3) ... 14/04/01 23:05:48 INFO Executor: Running task ID 2 14/04/01 23:05:48 INFO TaskSetManager: Finished TID 1 in 23 ms on localhost (progress: 1/3) 14/04/01 23:05:48 INFO DAGScheduler: Completed ResultTask(0, 1) ? res0: Array[Int] = Array(2, 6, 4, 8) |
val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3) def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = { iter.toList.map(x => index + "," + x).iterator } x.mapPartitionsWithIndex(myfunc).collect() res10: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9, 2,10) |
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.mapValues("x" + _ + "x").collect res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex)) |
// generates 9 random numbers less than 1000. val x = sc.parallelize(1 to 9, 3) x.mapWith(a => new scala.util.Random)((x, r) => r.nextInt(1000)).collect res0: Array[Int] = Array(940, 51, 779, 742, 757, 982, 35, 800, 15) val a = sc.parallelize(1 to 9, 3) val b = a.mapWith("Index:" + _)((a, b) => ("Value:" + a, b)) b.collect res0: Array[(String, String)] = Array((Value:1,Index:0), (Value:2,Index:0), (Value:3,Index:0), (Value:4,Index:1), (Value:5,Index:1), (Value:6,Index:1), (Value:7,Index:2), (Value:8,Index:2), (Value:9,Index) |
val y = sc.parallelize(10 to 30) y.max res75: Int = 30 |
val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3) a.mean res0: Double = 5.3 |
val y = sc.parallelize(10 to 30) y.min res75: Int = 10 |
val y = sc.parallelize(1 to 10, 10) y.name res13: String = null y.setName("Fancy RDD Name") y.name res15: String = Fancy RDD Name |
val b = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) b.partitions res48: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@18aa, org.apache.spark.rdd.ParallelCollectionPartition@18ab) |
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) c.getStorageLevel res0: org.apache.spark.storage.StorageLevel = StorageLevel(false, false, false, false, 1) c.cache c.getStorageLevel res2: org.apache.spark.storage.StorageLevel = StorageLevel(false, true, false, true, 1) |
val a = sc.parallelize(1 to 9, 3) a.pipe("head -n 1").collect res2: Array[String] = Array(1, 4, 7) |
val y = sc.parallelize(1 to 10) val splits = y.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0) val test = splits(1) training.collect res:85 Array[Int] = Array(1, 4, 5, 6, 8, 10) test.collect res86: Array[Int] = Array(2, 3, 7, 9) val y = sc.parallelize(1 to 10) val splits = y.randomSplit(Array(0.1, 0.3, 0.6)) val rdd1 = splits(0) val rdd2 = splits(1) val rdd3 = splits(2) rdd1.collect res87: Array[Int] = Array(4, 10) rdd2.collect res88: Array[Int] = Array(1, 3, 5, 8) rdd3.collect res91: Array[Int] = Array(2, 6, 7, 9) |
val a = sc.parallelize(1 to 100, 3) a.reduce(_ + _) res41: Int = 5050 |
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2) val b = a.map(x => (x.length, x)) b.reduceByKey(_ + _).collect res86: Array[(Int, String)] = Array((3,dogcatowlgnuant)) val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.reduceByKey(_ + _).collect res87: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle)) |
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3) val b = a.keyBy(_.length) val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3) val d = c.keyBy(_.length) b.rightOuterJoin(d).collect res2: Array[(Int, (Option[String], String))] = Array((6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (3,(Some(dog),dog)), (3,(Some(dog),cat)), (3,(Some(dog),gnu)), (3,(Some(dog),bee)), (3,(Some(rat),dog)), (3,(Some(rat),cat)), (3,(Some(rat),gnu)), (3,(Some(rat),bee)), (4,(None,wolf)), (4,(None,bear))) |
val a = sc.parallelize(1 to 10000, 3) a.sample(false, 0.1, 0).count res24: Long = 960 a.sample(true, 0.3, 0).count res25: Long = 2888 a.sample(true, 0.3, 13).count res26: Long = 2985 |
val x = sc.parallelize(1 to 100, 3) x.saveAsObjectFile("objFile") val y = sc.objectFile[Array[Int]]("objFile") y.collect res52: Array[Int] = Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33) |
val v = sc.parallelize(Array(("owl",3), ("gnu",4), ("dog",1), ("cat",2), ("ant",5)), 2) v.saveAsSequenceFile("hd_seq_file") 14/04/19 05:45:43 INFO FileOutputCommitter: Saved output of task 'attempt_201404190545_0000_m_000001_191' to file:/home/cloudera/hd_seq_file [cloudera@localhost ~]$ ll ~/hd_seq_file total 8 -rwxr-xr-x 1 cloudera cloudera 117 Apr 19 05:45 part-00000 -rwxr-xr-x 1 cloudera cloudera 133 Apr 19 05:45 part-00001 -rwxr-xr-x 1 cloudera cloudera 0 Apr 19 05:45 _SUCCESS |
val a = sc.parallelize(1 to 10000, 3) a.saveAsTextFile("mydata_a") 14/04/03 21:11:36 INFO FileOutputCommitter: Saved output of task 'attempt_201404032111_0000_m_000002_71' to file:/home/cloudera/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_a [cloudera@localhost ~]$ head -n 5 ~/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_a/part-00000 1 2 3 4 5 // Produces 3 output files since we have created the a RDD with 3 partitions [cloudera@localhost ~]$ ll ~/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_a/ -rwxr-xr-x 1 cloudera cloudera 15558 Apr 3 21:11 part-00000 -rwxr-xr-x 1 cloudera cloudera 16665 Apr 3 21:11 part-00001 -rwxr-xr-x 1 cloudera cloudera 16671 Apr 3 21:11 part-00002 |
import org.apache.hadoop.io.compress.GzipCodec a.saveAsTextFile("mydata_b", classOf[GzipCodec]) [cloudera@localhost ~]$ ll ~/Documents/spark-0.9.0-incubating-bin-cdh4/bin/mydata_b/ total 24 -rwxr-xr-x 1 cloudera cloudera 7276 Apr 3 21:29 part-00000.gz -rwxr-xr-x 1 cloudera cloudera 6517 Apr 3 21:29 part-00001.gz -rwxr-xr-x 1 cloudera cloudera 6525 Apr 3 21:29 part-00002.gz val x = sc.textFile("mydata_b") x.count res2: Long = 10000 |
val x = sc.parallelize(List(1,2,3,4,5,6,6,7,9,8,10,21), 3) x.saveAsTextFile("hdfs://localhost:8020/user/cloudera/test"); val sp = sc.textFile("hdfs://localhost:8020/user/cloudera/sp_data") sp.flatMap(_.split(" ")).saveAsTextFile("hdfs://localhost:8020/user/cloudera/sp_x") |
val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2) x.stats res16: org.apache.spark.util.StatCounter = (count: 9, mean: 11.266667, stdev: 8.126859) |
val y = sc.parallelize(Array(5, 7, 1, 3, 2, 1)) y.sortBy(c => c, true).collect res101: Array[Int] = Array(1, 1, 2, 3, 5, 7) y.sortBy(c => c, false).collect res102: Array[Int] = Array(7, 5, 3, 2, 1, 1) val z = sc.parallelize(Array(("H", 10), ("A", 26), ("Z", 1), ("L", 5))) z.sortBy(c => c._1, true).collect res109: Array[(String, Int)] = Array((A,26), (H,10), (L,5), (Z,1)) z.sortBy(c => c._2, true).collect res108: Array[(String, Int)] = Array((Z,1), (L,5), (H,10), (A,26)) |
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2) val b = sc.parallelize(1 to a.count.toInt, 2) val c = a.zip(b) c.sortByKey(true).collect res74: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3)) c.sortByKey(false).collect res75: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5)) val a = sc.parallelize(1 to 100, 5) val b = a.cartesian(a) val c = sc.parallelize(b.takeSample(true, 5, 13), 2) val d = c.sortByKey(false) res56: Array[(Int, Int)] = Array((96,9), (84,76), (59,59), (53,65), (52,4)) |
val d = sc.parallelize(List(0.0, 0.0, 0.0), 3) d.stdev res10: Double = 0.0 d.sampleStdev res11: Double = 0.0 val d = sc.parallelize(List(0.0, 1.0), 3) d.stdev d.sampleStdev res18: Double = 0.5 res19: Double = 0.7071067811865476 val d = sc.parallelize(List(0.0, 0.0, 1.0), 3) d.stdev res14: Double = 0.4714045207910317 d.sampleStdev res15: Double = 0.5773502691896257 |
val a = sc.parallelize(1 to 9, 3) val b = sc.parallelize(1 to 3, 3) val c = a.subtract(b) c.collect res3: Array[Int] = Array(6, 9, 4, 7, 5, 8) |
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2) val b = a.keyBy(_.length) val c = sc.parallelize(List("ant", "falcon", "squid"), 2) val d = c.keyBy(_.length) b.subtractByKey(d).collect res15: Array[(Int, String)] = Array((4,lion)) |
val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2) x.sum res17: Double = 101.39999999999999 |
val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2) b.take(2) res18: Array[String] = Array(dog, cat) val b = sc.parallelize(1 to 10000, 5000) b.take(100) res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100) |
val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2) b.takeOrdered(2) res19: Array[String] = Array(ape, cat) |
val x = sc.parallelize(1 to 1000, 3) x.takeSample(true, 100, 1) res3: Array[Int] = Array(339, 718, 810, 105, 71, 268, 333, 360, 341, 300, 68, 848, 431, 449, 773, 172, 802, 339, 431, 285, 937, 301, 167, 69, 330, 864, 40, 645, 65, 349, 613, 468, 982, 314, 160, 675, 232, 794, 577, 571, 805, 317, 136, 860, 522, 45, 628, 178, 321, 482, 657, 114, 332, 728, 901, 290, 175, 876, 227, 130, 863, 773, 559, 301, 694, 460, 839, 952, 664, 851, 260, 729, 823, 880, 792, 964, 614, 821, 683, 364, 80, 875, 813, 951, 663, 344, 546, 918, 436, 451, 397, 670, 756, 512, 391, 70, 213, 896, 123, 858) |
val a = sc.parallelize(1 to 9, 3) val b = sc.parallelize(1 to 3, 3) val c = a.subtract(b) c.toDebugString res6: String = MappedRDD[15] at subtract at <console>:16 (3 partitions) SubtractedRDD[14] at subtract at <console>:16 (3 partitions) MappedRDD[12] at subtract at <console>:16 (3 partitions) ParallelCollectionRDD[10] at parallelize at <console>:12 (3 partitions) MappedRDD[13] at subtract at <console>:16 (3 partitions) ParallelCollectionRDD[11] at parallelize at <console>:12 (3 partitions) |
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2) c.toJavaRDD res3: org.apache.spark.api.java.JavaRDD[String] = ParallelCollectionRDD[6] at parallelize at <console>:12 |
val c = sc.parallelize(Array(6, 9, 4, 7, 5, 8), 2) c.top(2) res28: Array[Int] = Array(9, 8) |
val a = sc.parallelize(1 to 9, 3) val b = sc.parallelize(1 to 3, 3) val c = a.subtract(b) c.toString res7: String = MappedRDD[15] at subtract at <console>:16 |
val a = sc.parallelize(1 to 3, 1) val b = sc.parallelize(5 to 7, 1) (a ++ b).collect res0: Array[Int] = Array(1, 2, 3, 5, 6, 7) |
val y = sc.parallelize(1 to 10, 10) val z = (y++y) z.collect z.unpersist(true) 14/04/19 03:04:57 INFO UnionRDD: Removing RDD 22 from persistence list 14/04/19 03:04:57 INFO BlockManager: Removing RDD 22 |
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) val b = a.map(x => (x.length, x)) b.values.collect res3: Array[String] = Array(dog, tiger, lion, cat, panther, eagle) |
val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3) a.variance res70: Double = 10.605333333333332 val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2) x.variance res14: Double = 66.04584444444443 x.sampleVariance res13: Double = 74.30157499999999 |
val a = sc.parallelize(1 to 100, 3) val b = sc.parallelize(101 to 200, 3) a.zip(b).collect res1: Array[(Int, Int)] = Array((1,101), (2,102), (3,103), (4,104), (5,105), (6,106), (7,107), (8,108), (9,109), (10,110), (11,111), (12,112), (13,113), (14,114), (15,115), (16,116), (17,117), (18,118), (19,119), (20,120), (21,121), (22,122), (23,123), (24,124), (25,125), (26,126), (27,127), (28,128), (29,129), (30,130), (31,131), (32,132), (33,133), (34,134), (35,135), (36,136), (37,137), (38,138), (39,139), (40,140), (41,141), (42,142), (43,143), (44,144), (45,145), (46,146), (47,147), (48,148), (49,149), (50,150), (51,151), (52,152), (53,153), (54,154), (55,155), (56,156), (57,157), (58,158), (59,159), (60,160), (61,161), (62,162), (63,163), (64,164), (65,165), (66,166), (67,167), (68,168), (69,169), (70,170), (71,171), (72,172), (73,173), (74,174), (75,175), (76,176), (77,177), (78,... val a = sc.parallelize(1 to 100, 3) val b = sc.parallelize(101 to 200, 3) val c = sc.parallelize(201 to 300, 3) a.zip(b).zip(c).map((x) => (x._1._1, x._1._2, x._2 )).collect res12: Array[(Int, Int, Int)] = Array((1,101,201), (2,102,202), (3,103,203), (4,104,204), (5,105,205), (6,106,206), (7,107,207), (8,108,208), (9,109,209), (10,110,210), (11,111,211), (12,112,212), (13,113,213), (14,114,214), (15,115,215), (16,116,216), (17,117,217), (18,118,218), (19,119,219), (20,120,220), (21,121,221), (22,122,222), (23,123,223), (24,124,224), (25,125,225), (26,126,226), (27,127,227), (28,128,228), (29,129,229), (30,130,230), (31,131,231), (32,132,232), (33,133,233), (34,134,234), (35,135,235), (36,136,236), (37,137,237), (38,138,238), (39,139,239), (40,140,240), (41,141,241), (42,142,242), (43,143,243), (44,144,244), (45,145,245), (46,146,246), (47,147,247), (48,148,248), (49,149,249), (50,150,250), (51,151,251), (52,152,252), (53,153,253), (54,154,254), (55,155,255)... |
val a = sc.parallelize(0 to 9, 3) val b = sc.parallelize(10 to 19, 3) val c = sc.parallelize(100 to 109, 3) def myfunc(aiter: Iterator[Int], biter: Iterator[Int], citer: Iterator[Int]): Iterator[String] = { var res = List[String]() while (aiter.hasNext && biter.hasNext && citer.hasNext) { val x = aiter.next + " " + biter.next + " " + citer.next res ::= x } res.iterator } a.zipPartitions(b, c)(myfunc).collect res50: Array[String] = Array(2 12 102, 1 11 101, 0 10 100, 5 15 105, 4 14 104, 3 13 103, 9 19 109, 8 18 108, 7 17 107, 6 16 106) |
val z = sc.parallelize(Array("A", "B", "C", "D")) val r = z.zipWithIndex res110: Array[(String, Long)] = Array((A,0), (B,1), (C,2), (D,3)) val z = sc.parallelize(100 to 120, 5) val r = z.zipWithIndex r.collect res11: Array[(Int, Long)] = Array((100,0), (101,1), (102,2), (103,3), (104,4), (105,5), (106,6), (107,7), (108,8), (109,9), (110,10), (111,11), (112,12), (113,13), (114,14), (115,15), (116,16), (117,17), (118,18), (119,19), (120,20)) |
val z = sc.parallelize(100 to 120, 5) val r = z.zipWithUniqueId r.collect res12: Array[(Int, Long)] = Array((100,0), (101,5), (102,10), (103,15), (104,1), (105,6), (106,11), (107,16), (108,2), (109,7), (110,12), (111,17), (112,3), (113,8), (114,13), (115,18), (116,4), (117,9), (118,14), (119,19), (120,24)) |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。