赞
踩
解压win10的hadoop jar包hadoop-2.7.2.zip,并配置HADOOP_HOME环境变量。安装时注意,最好不要安装到带有空格的路径名下,例如:Programe Files,否则在配置Hadoop的配置文件时会找不到JDK(按相关说法,配置文件中的路径加引号即可解决,但我没测试成功)。
path添加%HADOOP_HOME%\bin(win10不用分号或者如下编辑界面不用分号,其余加上 ;)
解压scala-2.11.8.zip到安装目录,配置SCALA_HOME环境变量
path添加%HADOOP_HOME%\bin
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。
RDD在代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
import org.apache.spark.rdd.RDD import org.apache.spark.{ SparkConf, SparkContext} object Hello_Local { def main(args: Array[String]): Unit = { // 1. 创建一个SparkContext 打包的时候, 把本地模式master的设置方式【.setMaster("local[2]")】去掉, 在提交的时候使用 --maser 来设置master val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("Hello") val sc: SparkContext = new SparkContext(conf) // 2. 从数据源得到一个RDD val list = List("hello world","hello Tom","hello China","Tom is a boy") val lineRDD: RDD[String] = sc.parallelize(list) // 3. 对RDD做各种转换 val resultRDD: RDD[(String, Int)] = lineRDD.flatMap(_.split("\\W")).map((_, 1)).reduceByKey(_ + _) // 4. 执行一个行动算子 (collect: 把各个节点计算后的数据, 拉取到驱动端) val wordCountArr = resultRDD.collect() wordCountArr.foreach(println) // 5. 关闭SparkContext sc.stop() } }
打印结果:
(is,1)
(Tom,2)
(hello,3)
(boy,1)
(world,1)
(China,1)
(a,1)
在 Spark 中创建 RDD 的方式可以分为 3 种:
scala> val arr = Array(10,20,30,40,50,60)
arr: Array[Int] = Array(10, 20, 30, 40, 50, 60)
scala> val rdd1 = sc.parallelize(arr)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> val rdd1 = sc.makeRDD(Array(10,20,30,40,50,60))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
scala> var distFile = sc.textFile("words.txt")
distFile: org.apache.spark.rdd.RDD[String] = words.txt MapPartitionsRDD[1] at textFile at <console>:24
scala> distFile.collect
res0: Array[String] = Array(atguigu hello, hello world, how are you, abc efg)
就是通过 RDD 的各种转换算子来得到新的 RDD.
RDD 的操作算子包括两类,
Transformation | Meaning |
---|---|
map(func) | Return a new distributed dataset formed by passing each element of the source through a function func. |
filter(func) | Return a new dataset formed by selecting those elements of the source on which func returns true. |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). |
mapPartitions(func) | Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T. |
mapPartitionsWithIndex(func) | Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator) => Iterator when running on an RDD of type T. |
sample(withReplacement, fraction, seed) | Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed. |
union(otherDataset) | Return a new dataset that contains the union of the elements in the source dataset and the argument. |
intersection(otherDataset) | Return a new RDD that contains the intersection of elements in the source dataset and the argument. |
distinct([numPartitions])) | Return a new dataset that contains the distinct elements of the source dataset. |
groupByKey([numPartitions]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or |
reduceByKey(func, [numPartitions]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. |
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument. |
sortByKey([ascending], [numPartitions]) | When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument. |
join(otherDataset, [numPartitions]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin. |
cogroup(otherDataset, [numPartitions]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called groupWith. |
cartesian(otherDataset) | When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements). |
pipe(command, [envVars]) | Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings. |
coalesce(numPartitions) | Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset. |
repartition(numPartitions) | Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network. |
repartitionAndSortWithinPartitions(partitioner) | Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery. |
返回一个新的分布式数据集,由每个原元素经过func函数处理后的新元素组成
import org.apache.spark.rdd.RDD import org.apache.spark.{ SparkConf, SparkContext} object MapDemo1 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("MapDemo1").setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) val list1 = List(30, 50, 70, 60, 10, 20) val rdd1: RDD[Int] = sc.parallelize(list1) val rdd2 = rdd1.map(_ * 2) rdd2.collect.foreach(println) sc.stop() } }
输出结果:
60
100
140
120
20
40
类似于map,对RDD的每个分区起作用,在类型为T的RDD上运行时,func的函数类型必须是Iterator[T]=>Iterator[U]
import org.apache.spark.rdd.RDD import org.apache.spark.{ SparkConf, SparkContext} object MapPartitions { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("MapPartitions").setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) val list1 = List(30, 50, 70, 60, 10, 20) val rdd1: RDD[Int] = sc.parallelize(list1, 4) // 分成4个分区 val rdd2 = rdd1.mapPartitions((it: Iterator[Int]) => { println("abc") // 每个分区执行一次 it.map(_ * 2) }) rdd2.collect.foreach(println) sc.stop() } }
输出结果:
[Stage 0:> (0 + 0) / 4]
abc
abc
abc
abc
60
100
140
120
20
40
和mapPartitions类似,但func带有一个整数参数表上分区的索引值,在类型为T的RDD上运行时,func的函数参数类型必须是(int,Iterator[T])=>Iterator[U] sample(withReplacement,fraction,seed) 根据给定的随机种子seed,随机抽样出数量为fraction的数据
import org.apache.spark.rdd.RDD import org.apache.spark.{ SparkConf, SparkContext} object MapPartitionsWithIndex { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("MapPartitionsWithIndex").setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) val list1 = List(30, 50, 70, 60, 10, 20) val rdd1: RDD[Int] = sc.parallelize(list1) // (30, 0), (50, 0)... val rdd2 = rdd1.mapPartitionsWithIndex((index, it) => { it.map(x => (index, x)) // it.map((index, _)) }) rdd2.collect.foreach(println) sc.stop() } }
输出结果:
(0,30)
(0,50)
(0,70#pic_center =600x)
(1,60)
(1,10)
(1,20)
类似于map,但是每一个输入元素,会被映射为0个或多个输出元素,(因此,func函数的返回值是一个seq,而不是单一元素)
import org.apache.spark.{ SparkConf, SparkContext} object FlatMap { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("FlatMap").setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) val list1 = List(30, 5, 70, 6, 1, 20) val rdd1 = sc.parallelize(list1) // rdd2只要偶数和偶数的平方,三次方 val rdd2 = rdd1.flatMap(x => if (x % 2 == 0) List(x, x * x, x * x * x) else List[Int]()) rdd2.collect.foreach(println) sc.stop() } }
输出结果:
30
900
27000
70
4900
343000
6
36
216
20
400
8000
返回一个新的数据集,由经过func函数处理后返回值为true的原元素组成
import org.apache.spark.rdd.RDD import org.apache.spark.{ SparkConf, SparkContext} object Filter { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("Filter").setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) val list1 = List(30, 50, 70, 60, 10, 20) val rdd1: RDD[Int] = sc.parallelize(list1, 2) val rdd2 = rdd1.filter(x => x > 20) rdd2.collect.foreach(println) sc.stop() } }
输出结果:
30
50
70
60
作用: 将每一个分区的元素合并成一个数组,形成新的 RDD 类型是RDD[Array[T]]
import org.apache.spark.rdd.RDD import org.apache.spark.{ SparkConf, SparkContext} object Glom { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("Glom").setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) val list1 = List(30, 50, 70, 60, 10, 20) val rdd1: RDD[Int] = sc.parallelize(list1, 3) val rdd2 = rdd1.glom().map(_.toList) rdd2.collect.foreach(println) sc.stop() } }
输出结果:
List(30, 50)
List(70, 60)
List(10, 20)
import org.apache.spark.rdd.RDD import org.apache.spark.{ SparkConf, SparkContext} object GroupBy { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("GroupBy").setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) val list1 = List(30, 50, 7, 6, 1, 20) val rdd1: RDD[Int] = sc.parallelize(list1, 2) val rdd2 = rdd1.groupBy(x => x % 2) rdd2.collect.foreach(println) val rdd3 = rdd2.map { case (k, it) => (k, it.sum) } rdd3.collect.foreach(println) sc.stop() } }
输出结果:
(0,CompactBuffer(30, 50, 6, 20))
(1,CompactBuffer(7, 1))
(0,106)
(1,8)
从RDD中进行抽样
import org.apache.spark.rdd.RDD import org.apache.spark.{ SparkConf, SparkContext} object Sample { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("Sample").setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) val range = 1 to 20 val rdd1: RDD[Int] = sc.parallelize(range, 2) // 参数1: 表示是否放回抽样 (false, 比例[0, 1]) (true, [0, 无穷)) // val rdd2 = rdd1.sample(true, 2) // 放回抽样,每个样品可以抽多次,2只是一个大致的比例,抽取的数量不是严格按照此数量抽取 val rdd2 = rdd1.sample(false, 0.2) // 不放回抽样,0.2只是一个大致的比例,抽取的数量不是严格按照此比例 rdd2.collect.foreach(println) sc.stop() } }
输出结果:
4
10
11
15
19
20
作用: 管道,针对每个分区,把 RDD 中的每个数据通过管道传递给shell命令或脚本,返回输出的RDD。一个分区执行一次这个命令. 如果只有一个分区, 则执行一次命令.
注意:脚本要放在 worker 节点可以访问到的位置
步骤1: 创建一个脚本文件pipe.sh,文件内容如下:
echo "hello"
while read line;do
echo ">>>"$line
done
步骤2: 创建只有 1 个分区的RDD
scala> val rdd1 = sc.parallelize(Array(10,20,30,40), 1)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd1.pipe("./pipe.sh").collect
res1: Array[String] = Array(hello, >>>10, >>>20, >>>30, >>>40)
步骤3: 创建有 2 个分区的 RDD
scala> val rdd1 = sc.parallelize(Array(10,20,30,40), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> rdd1.pipe("./pipe.sh").collect
res2: Array[String] = Array(hello, >>>10, >>>20, hello, >>>30, >>>40)
返回一个新的数据集,由原数据集合参数联合而成
求两个RDD的交集
返回一个包含源数据集中所有不重复元素的i新数据集
对 RDD 中元素执行去重操作. 参数表示任务的数量.默认值和分区数保持一致.
import org.apache.spark.{ SparkConf, SparkContext} object Distinct00 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("Distinct").setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) val list = List(20, 30, 50, 70, 60, 10, 20, 70#pic_center =600x) val rdd1 = sc.parallelize(list) val rdd2 = rdd1.distinct(2) rdd2.collect.foreach(println) sc.stop() } }
输出结果:
30
50
70
20
60
10
import org.apache.spark.{ SparkConf, SparkContext} case class User(age: Int, name: String){ override def hashCode(): Int = this.age override def equals(obj: Any): Boolean = obj match { case User(age, _) => this.age == age case _ => false } } object Distinct { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("Distinct").setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) val rdd1 = sc.parallelize(List(User(10, "lisi"), User(20, "zs"), User(10, "ab"))) val rdd2 = rdd1.distinct(2) rdd2.collect.foreach(println) sc.stop() } }
输出结果:
User(20,zs)
User(10,lisi)
import org.apache.spark.{ SparkConf, SparkContext} case class User(age: Int, name: String) object Distinct03 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("Distinct").setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) val rdd1 = sc.parallelize(List(User(10, "lisi"), User(20, "zs"), User(10, "ab"))) val rdd2 = rdd1.distinct(2) rdd2.collect.foreach(println) sc.stop() } }
输出结果:
User(20,zs)
User(10,lisi)
数字降序
import org.apache.spark.{
SparkConf, SparkContext}
object SortBy {
def main(args: Array[String])
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。