当前位置:   article > 正文

大数据-计算引擎-Spark(三):RDD编程【离线分析;替代MapReduce编程,使用RDD(弹性分布式数据集)编程;处理非结构化数据;RDD操作算子:transformation、Action】_spark离线分析

spark离线分析

一、Windows环境配置

1、Windows下配置Hadoop环境

1.1 配置JAVA_HOME

在这里插入图片描述

1.2 配置HADOOP_HOME

解压win10的hadoop jar包hadoop-2.7.2.zip,并配置HADOOP_HOME环境变量。安装时注意,最好不要安装到带有空格的路径名下,例如:Programe Files,否则在配置Hadoop的配置文件时会找不到JDK(按相关说法,配置文件中的路径加引号即可解决,但我没测试成功)。
在这里插入图片描述
path添加%HADOOP_HOME%\bin(win10不用分号或者如下编辑界面不用分号,其余加上 ;)
在这里插入图片描述

1.3 配置SCALA_HOME

解压scala-2.11.8.zip到安装目录,配置SCALA_HOME环境变量
在这里插入图片描述
path添加%HADOOP_HOME%\bin
在这里插入图片描述

1.4 IntellJ配置Scala

在这里插入图片描述

二、RDD 概述

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象。

RDD在代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

1、理解 RDD

  • 一个 RDD 可以简单的理解为一个分布式的元素集合.
  • RDD 表示只读的分区的数据集,对 RDD 进行改动,只能通过 RDD 的转换操作, 然后得到新的 RDD, 并不会对原 RDD 有任何的影响
  • 在 Spark 中, 所有的工作要么是创建 RDD, 要么是转换已经存在 RDD 成为新的 RDD, 要么在 RDD 上去执行一些操作来得到一些计算结果.
  • 每个 RDD 被切分成多个分区(partition), 每个分区可能会在集群中不同的节点上进行计算.
  • RDD 是只读的,要想改变 RDD 中的数据,只能在现有 RDD 基础上创建新的 RDD。
  • 由一个 RDD 转换到另一个 RDD,可以通过丰富的转换算子实现,不再像 MapReduce 那样只能写map和reduce了。

2、RDD 的 5 个主要属性(property)

2.1 A list of partitions

  • 一系列分区,分区有编号,有顺序的
  • 多个分区. 分区可以看成是数据集的基本组成单位.
  • 对于 RDD 来说, 每个分区都会被一个计算任务处理, 并决定了并行计算的粒度.
  • 用户可以在创建 RDD 时指定 RDD 的分区数, 如果没有指定, 那么就会采用默认值. 默认值就是程序所分配到的 CPU Coure 的数目.
  • 每个分配的存储是由BlockManager 实现的. 每个分区都会被逻辑映射成 BlockManager 的一个 Block, 而这个 Block 会被一个 Task 负责计算.

2.2 A function for computing each split

  • 每一个切片都会有一个函数作业在上面用于对数据进行处理
  • 计算每个切片(分区)的函数.
  • Spark 中 RDD 的计算是以分片为单位的, 每个 RDD 都会实现 compute 函数以达到这个目的.

2.3 A list of dependencies on other RDDs

  • 与其他 RDD 之间的依赖关系
  • RDD 的每次转换都会生成一个新的 RDD, 所以 RDD 之间会形成类似于流水线一样的前后依赖关系.
  • 在部分分区数据丢失时, Spark 可以通过这个依赖关系重新计算丢失的分区数据, 而不是对 RDD 的所有分区进行重新计算.
  • RDD和RDD之间存在依赖关系,为了容错(重算,cache,checkpoint),也就是说在内存中的RDD操作时出错或丢失会进行重算

2.4 a Partitioner for key-value RDDs

  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  • 对存储键值对的 RDD, 还有一个可选的分区器.
  • 只有对于 key-value的 RDD, 才会有 Partitioner, 非key-value的 RDD 的 Partitioner 的值是 None. Partitiner 不但决定了 RDD 的本区数量, 也决定了 parent RDD Shuffle 输出时的分区数量.
  • 可选,key value类型的RDD才有RDD[(K,V)],如果是kv类型的RDD,会有一个分区器,默认是hash-partitioned
  • 如果RDD里面存的数据是key-value形式,则可以传递一个自定义的Partitioner进行重新分区,例如这里自定义的Partitioner是基于key进行分区,那则会将不同RDD里面的相同key的数据放到同一个partition里面

2.5 preferred locations

  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
  • 存储每个切片优先(preferred location)位置的列表. 比如对于一个 HDFS 文件来说, 这个列表保存的就是每个 Partition 所在文件块的位置. 按照“移动数据不如移动计算”的理念, Spark 在进行任务调度的时候, 会尽可能地将计算任务分配到其所要处理数据块的存储位置.
  • 可选,如果是从HDFS读取数据,会得到数据的最优位置(向NameNode请求元数据)去计算,也就是数据的本地性

三、RDD编程

1、RDD 编程模型

  • 在 Spark 中,RDD 被表示为对象,通过对象上的方法调用来对 RDD 进行转换。
  • 经过一系列的transformations定义 RDD 之后,就可以调用 actions 触发 RDD 的计算
  • action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。
  • 在Spark中,只有遇到action,才会执行 RDD 的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。
  • 要使用 Spark,开发者需要编写一个 Driver 程序,它被提交到集群以调度运行 Worker
  • Driver 中定义了一个或多个 RDD,并调用 RDD 上的 action,Worker 则执行 RDD 分区计算任务。
  • 具体步骤:
    1. 创建一个SparkContext
    2. 从数据源得到一个RDD
    3. 对RDD做各种转换
    4. 执行一个行动算子 (collect: 把各个节点计算后的数据, 拉取到驱动端)
    5. 关闭SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.{
   SparkConf, SparkContext}


object Hello_Local {
   
    def main(args: Array[String]): Unit = {
   
        // 1. 创建一个SparkContext  打包的时候, 把本地模式master的设置方式【.setMaster("local[2]")】去掉, 在提交的时候使用 --maser 来设置master
        val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("Hello")
        val sc: SparkContext = new SparkContext(conf)
        // 2. 从数据源得到一个RDD
        val list = List("hello world","hello Tom","hello China","Tom is a boy")
        val lineRDD: RDD[String] = sc.parallelize(list)
        // 3. 对RDD做各种转换
        val resultRDD: RDD[(String, Int)] = lineRDD.flatMap(_.split("\\W")).map((_, 1)).reduceByKey(_ + _)
        // 4. 执行一个行动算子   (collect: 把各个节点计算后的数据, 拉取到驱动端)
        val wordCountArr = resultRDD.collect()
        wordCountArr.foreach(println)
        // 5. 关闭SparkContext
        sc.stop()
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

打印结果:

(is,1)
(Tom,2)
(hello,3)
(boy,1)
(world,1)
(China,1)
(a,1)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2、RDD 的创建

在 Spark 中创建 RDD 的方式可以分为 3 种:

  • 从集合中创建 RDD
  • 从外部存储创建 RDD
  • 从其他 RDD 转换得到新的 RDD。

2.1 从集合中创建 RDD

  • 使用parallelize函数创建
    scala> val arr = Array(10,20,30,40,50,60)
    arr: Array[Int] = Array(10, 20, 30, 40, 50, 60)
    
    scala> val rdd1 = sc.parallelize(arr)
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
    
    • 1
    • 2
    • 3
    • 4
    • 5
  • 使用makeRDD函数创建
    scala> val rdd1 = sc.makeRDD(Array(10,20,30,40,50,60))
    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at <console>:24
    
    • 1
    • 2
  • 一旦 RDD 创建成功, 就可以通过并行的方式去操作这个分布式的数据集了.
  • parallelize和makeRDD还有一个重要的参数就是把数据集切分成的分区数.
  • Spark 会为每个分区运行一个任务(task). 正常情况下, Spark 会自动的根据你的集群来设置分区数

2.2 从外部存储创建 RDD

  • Spark 也可以从任意 Hadoop 支持的存储数据源来创建分布式数据集.
  • 外部存储设备可以是:
    • 本地文件系统
    • HDFS
    • Cassandra
    • HVase,
    • Amazon S3 等等
  • Spark 支持 文本文件, SequenceFiles, 和其他所有的 Hadoop InputFormat.
    scala> var distFile = sc.textFile("words.txt")
    distFile: org.apache.spark.rdd.RDD[String] = words.txt MapPartitionsRDD[1] at textFile at <console>:24
    
    • 1
    • 2
    scala> distFile.collect
    res0: Array[String] = Array(atguigu hello, hello world, how are you, abc efg)
    
    • 1
    • 2
  1. url可以是本地文件系统文件, hdfs://…, s3n://…等等
  2. 如果是使用的本地文件系统的路径, 则必须每个节点都要存在这个路径
  3. 所有基于文件的方法, 都支持目录, 压缩文件, 和通配符(). 例如:
    textFile(“/my/directory”), textFile("/my/directory/
    .txt"), and textFile(“/my/directory/*.gz”).
  4. textFile还可以有第二个参数, 表示分区数. 默认情况下, 每个块对应一个分区.(对 HDFS 来说, 块大小默认是 128M). 可以传递一个大于块数的分区数, 但是不能传递一个比块数小的分区数.

2.3 从其他 RDD 转换得到新的 RDD

就是通过 RDD 的各种转换算子来得到新的 RDD.

四、RDD操作算子:transformation算子

RDD 的操作算子包括两类,

  • 一类叫做transformation,它是用来将 RDD 进行转化,构建 RDD 的血缘关系;
  • 另一类叫做action,它是用来触发 RDD 进行计算,得到 RDD 的相关计算结果或者 保存 RDD 数据到文件系统中.
Transformation Meaning
map(func) Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func) Return a new dataset formed by selecting those elements of the source on which func returns true.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T.
mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator) => Iterator when running on an RDD of type T.
sample(withReplacement, fraction, seed) Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset) Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset) Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numPartitions])) Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numPartitions]) When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or
reduceByKey(func, [numPartitions]) When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numPartitions]) When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numPartitions]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numPartitions]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called groupWith.
cartesian(otherDataset) When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command, [envVars]) Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions) Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions) Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner) Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

1、Value 类型

1.1 map(func)

返回一个新的分布式数据集,由每个原元素经过func函数处理后的新元素组成

import org.apache.spark.rdd.RDD
import org.apache.spark.{
   SparkConf, SparkContext}

object MapDemo1 {
   
    def main(args: Array[String]): Unit = {
   
        val conf: SparkConf = new SparkConf().setAppName("MapDemo1").setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val list1 = List(30, 50, 70, 60, 10, 20)
        val rdd1: RDD[Int] = sc.parallelize(list1)
        val rdd2 = rdd1.map(_ * 2)
        rdd2.collect.foreach(println)
        sc.stop()
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

输出结果:

60
100
140
120
20
40
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

1.2 mapPartitions(func)

类似于map,对RDD的每个分区起作用,在类型为T的RDD上运行时,func的函数类型必须是Iterator[T]=>Iterator[U]

import org.apache.spark.rdd.RDD
import org.apache.spark.{
   SparkConf, SparkContext}

object MapPartitions {
   
    def main(args: Array[String]): Unit = {
   
        val conf: SparkConf = new SparkConf().setAppName("MapPartitions").setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val list1 = List(30, 50, 70, 60, 10, 20)
        val rdd1: RDD[Int] = sc.parallelize(list1, 4) // 分成4个分区
        val rdd2 = rdd1.mapPartitions((it: Iterator[Int]) => {
   
            println("abc")  // 每个分区执行一次
            it.map(_ * 2)
        })
        rdd2.collect.foreach(println)
        sc.stop()
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

输出结果:

[Stage 0:> (0 + 0) / 4]
abc
abc
abc
abc
60
100
140
120
20
40
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

1.3 mapPartitionsWithIndex(func)

和mapPartitions类似,但func带有一个整数参数表上分区的索引值,在类型为T的RDD上运行时,func的函数参数类型必须是(int,Iterator[T])=>Iterator[U] sample(withReplacement,fraction,seed) 根据给定的随机种子seed,随机抽样出数量为fraction的数据

import org.apache.spark.rdd.RDD
import org.apache.spark.{
   SparkConf, SparkContext}

object MapPartitionsWithIndex {
   
    def main(args: Array[String]): Unit = {
   
        val conf: SparkConf = new SparkConf().setAppName("MapPartitionsWithIndex").setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val list1 = List(30, 50, 70, 60, 10, 20)
        val rdd1: RDD[Int] = sc.parallelize(list1)
        // (30, 0), (50, 0)...
        val rdd2 = rdd1.mapPartitionsWithIndex((index, it) => {
   
            it.map(x => (index, x))
            //            it.map((index, _))
        })
        rdd2.collect.foreach(println)
        sc.stop()
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

输出结果:

(0,30)
(0,50)
(0,70#pic_center =600x)
(1,60)
(1,10)
(1,20)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

1.4 flatMap(func)

类似于map,但是每一个输入元素,会被映射为0个或多个输出元素,(因此,func函数的返回值是一个seq,而不是单一元素)

import org.apache.spark.{
   SparkConf, SparkContext}

object FlatMap {
   
    def main(args: Array[String]): Unit = {
   
        val conf: SparkConf = new SparkConf().setAppName("FlatMap").setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val list1 = List(30, 5, 70, 6, 1, 20)
        val rdd1 = sc.parallelize(list1)
        // rdd2只要偶数和偶数的平方,三次方
        val rdd2 = rdd1.flatMap(x => if (x % 2 == 0) List(x, x * x, x * x * x) else List[Int]())
        rdd2.collect.foreach(println)
        sc.stop()
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

输出结果:

30
900
27000
70
4900
343000
6
36
216
20
400
8000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

1.5 filter(func)

返回一个新的数据集,由经过func函数处理后返回值为true的原元素组成

import org.apache.spark.rdd.RDD
import org.apache.spark.{
   SparkConf, SparkContext}

object Filter {
   
    def main(args: Array[String]): Unit = {
   
        val conf: SparkConf = new SparkConf().setAppName("Filter").setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val list1 = List(30, 50, 70, 60, 10, 20)
        val rdd1: RDD[Int] = sc.parallelize(list1, 2)
        val rdd2 = rdd1.filter(x => x > 20)
        rdd2.collect.foreach(println)
        sc.stop()
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

输出结果:

30
50
70
60
  • 1
  • 2
  • 3
  • 4

1.6 glom()

作用: 将每一个分区的元素合并成一个数组,形成新的 RDD 类型是RDD[Array[T]]

import org.apache.spark.rdd.RDD
import org.apache.spark.{
   SparkConf, SparkContext}

object Glom {
   
    def main(args: Array[String]): Unit = {
   
        val conf: SparkConf = new SparkConf().setAppName("Glom").setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val list1 = List(30, 50, 70, 60, 10, 20)
        val rdd1: RDD[Int] = sc.parallelize(list1, 3)
        val rdd2 = rdd1.glom().map(_.toList)
        rdd2.collect.foreach(println)
        sc.stop()
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

输出结果:

List(30, 50)
List(70, 60)
List(10, 20)
  • 1
  • 2
  • 3

1.7 groupBy()

  • 按照func的返回值进行分组.
  • func返回值作为 key, 对应的值放入一个迭代器中. 返回的 RDD: RDD[(K, Iterable[T])
  • 每组内元素的顺序不能保证, 并且甚至每次调用得到的顺序也有可能不同.
import org.apache.spark.rdd.RDD
import org.apache.spark.{
   SparkConf, SparkContext}

object GroupBy {
   
    def main(args: Array[String]): Unit = {
   
        val conf: SparkConf = new SparkConf().setAppName("GroupBy").setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val list1 = List(30, 50, 7, 6, 1, 20)
        val rdd1: RDD[Int] = sc.parallelize(list1, 2)
        val rdd2 = rdd1.groupBy(x => x % 2)
        rdd2.collect.foreach(println)
        val rdd3 = rdd2.map {
   
            case (k, it) => (k, it.sum)
        }
        rdd3.collect.foreach(println)
        sc.stop()
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

输出结果:

(0,CompactBuffer(30, 50, 6, 20))
(1,CompactBuffer(7, 1))
(0,106)
(1,8)
  • 1
  • 2
  • 3
  • 4

1.7 sample(withReplacement, fraction, seed)

从RDD中进行抽样

  1. 以指定的随机种子随机抽样出比例为fraction的数据,(抽取到的数量是: size * fraction). 需要注意的是得到的结果并不能保证准确的比例.
  2. withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样. 放回表示数据有可能会被重复抽取到, false 则不可能重复抽取到. 如果是false, 则fraction必须是:[0,1], 是 true 则大于等于0就可以了.
  3. seed用于指定随机数生成器种子。 一般用默认的, 或者传入当前的时间戳
import org.apache.spark.rdd.RDD
import org.apache.spark.{
   SparkConf, SparkContext}

object Sample {
   
    def main(args: Array[String]): Unit = {
   
        val conf: SparkConf = new SparkConf().setAppName("Sample").setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val range = 1 to 20
        val rdd1: RDD[Int] = sc.parallelize(range, 2)
        // 参数1: 表示是否放回抽样 (false, 比例[0, 1])   (true, [0, 无穷))
        // val rdd2 = rdd1.sample(true, 2)	// 放回抽样,每个样品可以抽多次,2只是一个大致的比例,抽取的数量不是严格按照此数量抽取
        val rdd2 = rdd1.sample(false, 0.2)	// 不放回抽样,0.2只是一个大致的比例,抽取的数量不是严格按照此比例
        rdd2.collect.foreach(println)
        sc.stop()
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

输出结果:

4
10
11
15
19
20
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

1.8 pipe(command,[envVars]) --Linux环境使用

作用: 管道,针对每个分区,把 RDD 中的每个数据通过管道传递给shell命令或脚本,返回输出的RDD。一个分区执行一次这个命令. 如果只有一个分区, 则执行一次命令.

注意:脚本要放在 worker 节点可以访问到的位置

步骤1: 创建一个脚本文件pipe.sh,文件内容如下:

echo "hello"
while read line;do
    echo ">>>"$line
done
  • 1
  • 2
  • 3
  • 4

步骤2: 创建只有 1 个分区的RDD

scala> val rdd1 = sc.parallelize(Array(10,20,30,40), 1)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd1.pipe("./pipe.sh").collect
res1: Array[String] = Array(hello, >>>10, >>>20, >>>30, >>>40)
  • 1
  • 2
  • 3
  • 4
  • 5

步骤3: 创建有 2 个分区的 RDD

scala> val rdd1 = sc.parallelize(Array(10,20,30,40), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> rdd1.pipe("./pipe.sh").collect
res2: Array[String] = Array(hello, >>>10, >>>20, hello, >>>30, >>>40)
  • 1
  • 2
  • 3
  • 4
  • 5

1.9 union(otherDataSet)

返回一个新的数据集,由原数据集合参数联合而成

1.10 intersection(otherDataset)

求两个RDD的交集

1.11 distinct([numtasks])

返回一个包含源数据集中所有不重复元素的i新数据集

对 RDD 中元素执行去重操作. 参数表示任务的数量.默认值和分区数保持一致.

import org.apache.spark.{
   SparkConf, SparkContext}

object Distinct00 {
   
    def main(args: Array[String]): Unit = {
   
        val conf: SparkConf = new SparkConf().setAppName("Distinct").setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val list = List(20, 30, 50, 70, 60, 10, 20, 70#pic_center =600x)
        val rdd1 = sc.parallelize(list)
        val rdd2 = rdd1.distinct(2)
        rdd2.collect.foreach(println)
        sc.stop()
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

输出结果:

30
50
70
20
60
10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
import org.apache.spark.{
   SparkConf, SparkContext}

case class User(age: Int, name: String){
   
    override def hashCode(): Int = this.age
    override def equals(obj: Any): Boolean = obj match {
   
        case User(age, _) => this.age == age
        case _ => false
    }
}
object Distinct {
   
    def main(args: Array[String]): Unit = {
   
        val conf: SparkConf = new SparkConf().setAppName("Distinct").setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd1 = sc.parallelize(List(User(10, "lisi"), User(20, "zs"), User(10, "ab")))
        val rdd2 = rdd1.distinct(2)
        rdd2.collect.foreach(println)
        sc.stop()
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

输出结果:

User(20,zs)
User(10,lisi)
  • 1
  • 2
import org.apache.spark.{
   SparkConf, SparkContext}

case class User(age: Int, name: String)
object Distinct03 {
   
    def main(args: Array[String]): Unit = {
   
        val conf: SparkConf = new SparkConf().setAppName("Distinct").setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd1 = sc.parallelize(List(User(10, "lisi"), User(20, "zs"), User(10, "ab")))
        val rdd2 = rdd1.distinct(2)
        rdd2.collect.foreach(println)
        sc.stop()
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

输出结果:

User(20,zs)
User(10,lisi)
  • 1
  • 2

1.12 Sortby

数字降序

import org.apache.spark.{
   SparkConf, SparkContext}

object SortBy {
   
    def main(args: Array[String])
  • 1
  • 2
  • 3
  • 4
  • 5
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/人工智能uu/article/detail/736365
推荐阅读
相关标签
  

闽ICP备14008679号