当前位置:   article > 正文

Spark期末知识合集_spark大数据技术与应用期末知识点总结

spark大数据技术与应用期末知识点总结

目录

Scala基础

一.定义与使用元组

1.map()方法 (一对一映射)

2.flatMap()方法 (一对多映射)

 3.groupBy()方法     

Spark编程基础

1. makeRDD()

2.使用 union()方法合并多个 RDD

3.使用 filter()方法进行过滤

4.使用 distinct()方法进行去重

5.使用map()方法转换数据

6.使用 sortBy)方法进行排序

7.使用 collect()方法查询数据

8.使用 flatMap)方法转换数据

Spark变成进阶

1.在集群环境中运行 Spark

Spark SQL ——结构化数据文化处理

 一.DataFrame简介

二.DataFrame的创建

1.数据准备

2.已存在的RDD调用toDF()方法转换得道DataFrame

三.DataFrame的常用操作

1.操作DataFrame的常用方法,具体如下表:

2.select实现队列进行重命名

3.selectExpr()方法指定字段特殊处理

4.filter()/where条件查询

5.groupBy()对数据进行分组


Scala基础

一.定义与使用元组

1.map()方法 (一对一映射)

 map()方法可通过一个函数重新计算列表中的所有元素,并且返回一个包含相同数目元素的新列表。例如,定义一个Int类型列表,列表中的元素为 1~5,使用 map()方法对列表中的元素进行平方计算

2.flatMap()方法 (一对多映射)

flatMap()方法结合了 map()方法和 flatten()方法的功能,接收一个可以处理嵌套列表的函数,再对返回结果进行连接,如下代码

 3.groupBy()方法     

groupBy ()方法可对集合中的元素进行分组操作,返回的结果是一个映射。对 1~10根据奇偶性进行分组,因此 groupBy ()方法传入的参数是一个计算偶数的函数,得到的结果是一个映射,包含两个键值对,键为 false 对应的值为奇数列表,键为true对应的值为偶数列表,如下代码

                

Spark编程基础

一.RDD简介

RDD 是一个容错的、只读的、可进行并行操作的数据结构,是一个分布在集群各个节点中的存放元素的集合。RDD有3种不同的创建方法。第一种是将程序中已存在的 Sea 集合(如集合、列表、数组)转换成RDD,第二种是对已有RDD 进行转换得到新的RDD,这两种方法都是通过内存中已有的数据创建RDD 的。第三种是直接读取外部存储系统的数据创建 RDD。


1. makeRDD()


makeRDDO方法有两种使用方式,第一种使用方式与 parallelize)方法一致;第二种方式是通过接收一个 Seq[(T, Seq[String])]参数类型创建 RDD。第二种方式生成的 RDD 中保存的是T的值,Seq[String]部分的数据会按照 Seg[(T, Seq[String)]的顺序存放到各个分区中,一个 Seq[String]对应存放至一个分区,并为数据提供位置信息,通过preferredLocations0方法可以根据位置信息查看每一个分区的值。调用 makeRDDO时不可以直接指定 RDD的分区个数,分区的个数与 Seq[String)参数的个数是保持一致的。使用 makeRDDO方法创建 RDD,并根据位置信息查看每一个分区的值。

使用 makeRDD()方法创建 RDD 并查看各分区的值
#定义一个序列 seg
val seq = Seg((1, Seq("iteblog.com", "sparkhostl.com")),
(3, Seq("itebolg.com", "sparkhost2.com")), (2,Seq("iteblog.com", "sparkhost3.com")))# 使用 makeRDD()方法创建RDD
val iteblog = sc.makeRDD(seg)
# 查看 RDD的值iteblog.collect # 查看分区个数
iteblog.partitioner
iteblog.partitions.size
# 根据位置信息查看每一个分区的值
iteblog.preferredLocations (iteblog.partitions (0))iteblog.preferredLocations (iteblog.partitions (1))iteblog.preferredLocations(iteblog.partitions(2))


2.使用 union()方法合并多个 RDD


union()方法是一种转换操作,用于将两个RDD合并成一个,不进行去重操作,而且两
个RDD 中每个元素中的值的个数、数据类型需要保持一致。创建两个存放二元组的 RDD, 通过 umion)方法合并两个RDD,不处理重复数据,并且每个二元组的值的个数、数据类型都是一致的。


3.使用 filter()方法进行过滤


fiter)方法是一种转换操作,用于过滤 RDD 中的元素。filter()方法需要一个参数,这个参数是一个用于过源的函数,该丽数的返回值为 Boolean 类型。filter()方法将返回值为 true 的元素保留,将返回值为 false的元素过滤掉,最后返回一个存储符合过滤条件的所有元素的新RDD。
创建一个RDD,并且过滤掉每个元组第二个值小于等于1的元素,如代码 3-14所示。其中第一个 ilter0方法中使用了“.2”,第一个“”与第二个 filter)方法中的“x”一样,均表示RDD 的每一个元素。

 

4.使用 distinct()方法进行去重

distinct)方法是一种转换操作,用于RDD的数据去重,去除两个完全相同的元素,没
有参数。创建一个带有重复数据的RDD,并使用 distinct)方法去重,如代码3-15 所示,通过 collect0方法查看结果。


5.使用map()方法转换数据


map()方法是一种基础的 RDD 转换操作,可以对RDD中的每一个数据元素通过某种丽数进行转换并返回新的RDD。map0方法是懒操作,不会立即进行计算。
转换操作是创建RDD 的第二种方法,通过转换已有 RDD生成新的 RDD。因为 RDD -个不可变的集合,所以如果对RDD 数据进行了某种转换,那么会生成一个新的 RDD。
例如,通过一个存放了5个 Int类型的数据元素的列表创建一个RDD,可通过map()方法对每一个元素进行平方运算,结果会生成一个新的RDD,如代码3-6所示。
map()方法示例
创建 RDD
val distData = sc.parallelize (List(I;,3, 45,3,76
# map)方法求平方值
val sq_dist = distData.nap(x=>x*x)


6.使用 sortBy)方法进行排序


sortBy()方法用于对标准RDD进行排序,有3个可输人参数,说明如下。
(1)第1个参数是一个函数(T)=>K,左边是要被排序对象中的每一个元素,右边返
回的值是元素中要进行排序的值。
(2)第2个参数是ascending, 决定排序后 RDD中的元素是升序的还是降序的,默认
是tue,即升序排序,如果需要降序排序则需要将参数的值设置为 falseo
(3)第3个参数是 wunPartitons, 块定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的分区个数相等,即 this,partitions.size.
第一个参数是必须输人的,而后面的两个参数可以不输人。例如,通过一个存放了3
个二元组的列表创建一个 RDD,对元组的第二个值进行降序排序,分区个数设置为1。
#.创建 RDD
val data = sc.parallelize(List((1, 3), (45,3), (7,6)))
#使用SoxtBy()方法对元组的第二个值进行降序排序,分区个数没置为1
val sort data - data.sortBy(x m> x.2, false,1)

7.使用 collect()方法查询数据


collect(方法是一种行动操作,可以将RDD 中所有元萦转换成数组并返回到 Dive端,
适用于返回处理后的少量数据。因为需要从集群各个节点收集数据到本地,经过网络传当并目加裁到 Driver内存中,所以如果数据量比较大,会给网络传输造成很大的压力。因此数据量较大时,尽量不使用collct0方法,否则可能导致Driver端出现内存益出问题。collegn 方法有以下两种操作方式。
(1)collect:直接调用collect 返回该RDD中的所有元索,返回类型是一个 Array[T]数组,这是较为常用的一种方式。
使用 collect()方法查看在sq_dist 和 sort_data的结果,分别返回了经过平方运算后的 Int类型的数组和对元组第二个值进行降序排列后的数组。
collect()方法示例
#查看 sq_dist 和 sort_data 的结果
sq_dist.collect
sort_data.collect
(2) collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U。这种方式需要提供一个标准的偏函数,将元素保存至一个RDD中。首先定义一个函数 one,用于将 collect 方法得到的数组中数值为1的值替换为“one",将其他值替换为“other”。创建一个只有3个 Iot 类型数据的RDD,在使用collec1()方法时将one 两数作为参数。
collecl(PartialFunction)方法示例
定义一个函数 one
val one:PartialrunctionlInt, stringl - (caso I > "one":case - -> "othe,
# 创建RDD
val data = sc, parallelize (List(2,3, 1))
#使用collect()方法,将 one 两数作为参数
data.collect (one).collect


8.使用 flatMap)方法转换数据


fatMap()方法将函数参数应用于RDD之中的每一个元素,将返回的迭代器(如数组、列表等)中的所有元素构成新的 RDD。使用 flatMap0方法时先进行 map(映射)再进行 flat (扁平化)操作,数据会先经过跟 map()方法一样的操作,为每一条输人返回一个迭代器(可迭代的数据类型),然后将所得到的不同级别的送代器中的元素全部当成同级别的元素,返回一个元素级别全部相同的RDD。这个转换操作通常用来切分单词。
例如,分别用 map()方法和 flatMap()方法分割字符串。用map()方法分割后,每个元素
对应返回一个迭代器,即数组。flatMapO方法在进行同 map()方法一样的操作后,将3个迭代器的元素扁平化(压成同一级别),保存在新 RDD 中。
flatMap()方法示例
# 创建 RDD
val. test = sc.parallelize (List ("How are you", "I am fine", "what about you"))
# 查看 RDD
test.collect
#使用map分割字符电后,再查看 RDD
test.map(x=> x,split("")).collect
#使用latMap分割字符串后,再查看 RDD
test.flatMap(x => x.split("")).collect

Spark变成进阶

1.在集群环境中运行 Spark


直接在开发环境中运行 Spark 程序时通常选择的是本地模式。如果数据的规模比较庞大,更常用的方式还是在 Spark 程序开发测试完成后编译打包为Java 归档(Java Archive, JAR)包,并通过 spark-submit 命令提交到 Spark集群环境中运行。
spark-submit的脚本在 Spark安装目录的bin 目录下,spark-submit 是 Spark 为所有支特
的集群管理器提供的一个提交作业的工具。Spark 在/example 目录下有 Scala、Java、 Pyithon 和R的示例程序,都可以通过 spark-submit运行。


sporik-subomit 提交JAR 包到集群有一定的格式要求,需要设置一些参数,语法如下,

./bin/spark-submit --class <main-class>
--master <master-url>
--deploy-mode <deploy-mode>\
--conf <"key=value">
...# other options
<appli.cat.ion-jar> )
[application-argaments」

如果除了设置运行的脚本名称之外不设置其他参数,那么 Spark 程序默认在本地
运行。

--class:应用程序的入口,指主程序。
--master:指定要连接的集群URL。
-deploy-mode:是否将驱动程序部署在工作节点(cluster)或本地作为外部客户端
(client)。
--conf:设置任意 Spark 配登属性,允许使用"key=value WordCount"的格式设置任意的SparkConf 配置属性。
application-jar:包含应用程序和所有依赖关系的JAR包的路径。
application-arguments:传递给main)方法的参数。
将代码 4-3 所示的程序运行模式更改为打包到集样中运行。程序中无须设置 master 地址、Hadoop 安装包位置。输人、输出路径可通过 spark-submit 指定。
spark-submit常用项配置

ndme Name    设置程序名
--jars JARS   添加依赖包
-driver-memory MEM    Driver 程序使用的内存大小
-executor-memory MEM    Executor使用的内存大小
-total-executor-cores NUM    Executor使用的总内核数
--executor-cores NUM    每个Bxecutor使用的内核数
-num-executors NUM    启动的Executor数量
spark.eventLog.dir
保存日志相关信息的路径,可以是“hdfs://” 开头的 HDES 路径,也可以
是“e:// 开头的本地路径,路径均需要提前创建
spark.eventLog.enabled    是否开启日志记录
spark.cores.max
当应用程序运行在 Standalone 集群或粗粒度共享模式 Mesos 集群时,应用
程序向集群(不是每台机器,而是整个集群)请求的最大 CPU内核总数。如果不设置,那么对于 Standalone 集群将使用 spark,deploy. defaultCores 指定的数值,而 Mesos 集群将使用集群中可用的内核

Spark SQL ——结构化数据文化处理

 一.DataFrame简介

DataFrame可以看作是分布式的Row对象的集合,在二维表数据集的每一列都带有名称和类型,这就是Schema元信息,这使得Spark框架可获取更多数据结构信息,从而对在DataFrame背后的数据源以及作用于DataFrame之上数据变换进行针对性的优化,最终达到提升计算效率。

二.DataFrame的创建

1.数据准备

2.已存在的RDD调用toDF()方法转换得道DataFrame

3.通过Spark读取数据源直接创建DataFrame。

三.DataFrame的常用操作

1.操作DataFrame的常用方法,具体如下表:

2.select实现队列进行重命名

3.selectExpr()方法指定字段特殊处理

4.filter()/where条件查询

5.groupBy()对数据进行分组

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/煮酒与君饮/article/detail/819904
推荐阅读
相关标签
  

闽ICP备14008679号