赞
踩
目录
1、通过SparkSession中的createDataset来创建Dataset
2、DataFrame通过“as[ElementType]”方法转换得到Dataset
在编写程序代码时,经常需要用到各种的数据结构,选择合适的数据结构可以带来更高的运行或者存储效率Scala提供了许多数据结构,例如常见的数组、元组和集合等。
数组(Array)主要用来存储数据类型相同的元素
Scala中的数组分为 定长数组 和 变长数组 ,这两种数组的定义方式如下:
- new Array[T](数组长度) //定义定长数组
- ArrayBuffer[T]() //定义变长数组
注意:定义变长数组时,则需要导包import scala,collection.mutable.ArrayBuffer
通过一个例子来演示Scala数组简单使用
Scala中,如果想要获取数组中的每一个元素,则需要将数组进行遍历操作。迭代输出。
数组转换就是通过yield关键字将原始的数组进行转换,会产生一个新的数组,然而原始的数组保持不变。定义一个数组,实现将偶数取出乘以10后生成一个新的数组。
scala中,集合有三大类: List、set以及Map
Scala 集合分为可变的(mutable)和不可变(immutable)的集合。
Scala的列表(List)与数组相似,不同的是列表不可变,不能通过赋值进行更改。
在Scala中,List列表和数组类似,列表的所有元素都具有相同类型。这里的List默认是不可变列表,
如果要定义可变列表,需要导入 import scala.collection.mutable.ListBuffer 包
- // 字符串
- val fruit: List[String]= List("apples", "oranges", "pears")
- // 整型
- val nums:List[Int]= List(1,2,3,4)
- //空
- val empty: List[Nothing]= List()
- //二维列表
- val dim:List[List[Int]]=
- List(
- List(1,0,8)
- List(0,1,0),
- List(0,0,1)
注:在scala中,可以使用“Nil”和“::”操作符来定义列表
- // 字符串
- val fruit ="apples"::("oranges"::("pears"::Nil))
- // 整型
- val nums=1::(2::(3::(4::Nil)))
- // 空列表
- val empty = Nil
- // 二维列表
- val dim=(1::(0::(0::Ni1)))::
- (0::(1::(0::Ni1)))::
- (0::(0::(1::Ni1)))::Ni1
在Scala中,Set是没有重复对象的集合,所有元素都是唯一的。默认情况下,Scala 使用不可变Set集合,若想使用可变的Set集合,
则需要引入 scala.collection.mutable.Set包。定义Set集合的语法格式如下:
val set:SetlInt]= Set(1,2.3,4,5)
在Scala中,Map是一种可迭代的键值对(key/value)结构,并且键是唯一的。
若需要使用可变的Map集合,则需要import scala.collection.mutable.Map类.
var A:Map[Char,Int]= Map(键 ->值,键 ->值...)//Map键值对,键为Char,值为Int
定义一个Map集合colors,使用Map常见的方法对集合colors进行相关的操作
元组是一种类似与列表的类,但与列表不同的是,元组可以包含不同类型的元素,元组的值是通过将单个的值包含在圆括号中;构成Scala的元组是对多个不同类型对象的一种简单封装,它将不同的值用小括号括起来,并用逗号作分隔,即表示元组。
val tuple=(元素,元素...)
定义一个元组元组包含3个元素,对应的类型(Int,Double,String)
val x=new tuble3(1,13.14,"x")
在Scala中,获取元组中的值是通过下划线加脚标(例如:tuple.1,tuple.2)来获取的,元组中的元素脚标是从1开始的。
- scala> tuple._1 # 获取第一个值
- res2: String = itcast
-
- scala> tuple._2 # 获取第二个值
- res3: Double = 3.14
map()方法可通过一个函数重新计算列表中的所有元素,并且返回一个包含相同数目元素的新列表。
flatMap()方法结合了 map()方法和 flatten()方法的功能,接收一个可以处理嵌套列表的函数,再对返回结果进行连接。
foreach()方法和 map()方法类似,但是foreach()方法没有返回值,只用于对参数的失进行输出。
使用 filter()方法可以移除传入函数的返回值为 false 的元素。
flatten()方法可以将嵌套的结构展开,即 flatten()方法可以将一个二維的列表展开成一个一维的列表。定义一个二维列表list,通过 flatten()方法可以将 list 展开为一维列表。
groupBy ()方法可对集合中的元素进行分组操作,返回的结果是一个映射。
Scala是一种面向对象的语言
无论是在Scala中还是Java中,类都是对象的抽象,而对象都是类的具体实例;创建类的语法格式如下:
class 类名|参数列表]
当类创建好之后,若是想要访问类中的方法和字段,就需要创建一个对象。创建对象的语法格式如下:
类名 对象名称 = new 类名();
下面,我们创建一个Point类,并在类中定义两个字段x和y以及一个没有返回值的move()方法,使用Point类的实例对象来访问类中的方法和字段
Scala和Java类似,,只允许继承一个父类。不同的是,Java只能继承父类中非私有的属性和方法。而Scala可以继承父类中的所有属性和方法。
下面,创建一个Point类和一个Location类,并且Location类继承Point类,演示子类Location重写父类Point中的字段
- class Point(val xc:Int, val yc:Int){
- var x:Int =xc
- var y:Int =yc
- def move(dx:Int,dy:Int): Unit ={
- x=x+ dx
- y=y + dy
- println("x的坐标点:"+x)
- println("y的坐标点:“+y)
- }
- }
- //定义子类:Location 继承Point类
- class Location(override val xc:Int, override val yc:Int, val zc:Int) extends Point(xc,yc){
- var z:Int =zc
- def move(dx:Int, dy:Int, dz:Int): unit ={
- }
- }
- object ExtendsTestfde {
- def main(args:Array[string]):unit ={
- //创建一个子类对象:Location
- val loc = new Location(10,20,15)
- //移到一个新的位置
- loc.move(10,10,5)
- }
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
RDD是一个容错的、只读的、可进行并行操作的数据结构,是一个分布在集群各个节点中的存放元素的集合
RDD的创建有3种不同的方法。
第一种是将程序中已存在的Seq集合(如集合、列表、数组)转换成RDD。
第二种是对已有RDD进行转换得到新的RDD,这两种方法都是通过内存中已有的集合创建RDD的。
第三种是直接读取外部存储系统的数据创建RDD。
要转化的集合,必须是Seq集合。Seq表示序列,指的是一类具有一定长度的、可迭代访问的对象,其中每个数据元素均带有一个从0开始的、固定的索引。
分区数。若不设分区数,则RDD的分区数默认为该程序分配到的资源的CPU核心数。
makeRDD()方法有两种使用方式。
第一种方式的使用与parallelize()方法一致;
第二种方式是通过接收一个是Seq[(T,Seq[String])]参数类型创建RDD。
从外部存储系统中读取数据创建RDD可以有很多种数据来源,可通过SparkContext对象的textFile()方法读取数据集,该方法支持多种类型的数据集,如目录、文本文件、压缩文件和通配符匹配的文件等,并且允许设定分区个数。
分别读取HDFS文件和Linux本地文件的数据并创建RDD,具体操作如下。
通过HDFS文件创建RDD
直接通过textFile()方法读取HDFS文件的位置即可。
通过Linux本地文件创建RDD
map()方法是一种基础的RDD转换操作,可以对RDD中的每一个数据元素通过某种函数进行转换并返回新的RDD。
map()方法是转换操作,不会立即进行计算。
转换操作是创建RDD的第二种方法,通过转换已有RDD生成新的RDD。因为RDD是一个不可变的集合,所以如果对RDD数据进行了某种转换,那么会生成一个新的RDD。
sortBy()方法用于对标准RDD进行排序,有3个可输入参数,说明如下。
第1个参数是一个函数f:(T) => K,左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值
第2个参数是ascending,决定排序后RDD中的元素是升序的还是降序的,默认是true,即升序排序,如果需要降序排序那么需要将参数的值设置为false。
第3个参数是numPartitions,决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的分区个数相等,即this.partitions.size。
第一个参数是必须输入的,而后面的两个参数可以不输入。
collect()方法是一种行动操作,可以将RDD中所有元素转换成数组并返回到Driver端,适用于返回处理后的少量数据。
因为需要从集群各个节点收集数据到本地,经过网络传输,并且加载到Driver内存中,所以如果数据量比较大,会给网络传输造成很大的压力。
因此,数据量较大时,尽量不使用collect()方法,否则可能导致Driver端出现内存溢出问题。
flatMap()方法将函数参数应用于RDD之中的每一个元素,将返回的迭代器(如数组、列表等)中的所有元素构成新的RDD。
使用flatMap()方法时先进行map(映射)再进行flat(扁平化)操作,数据会先经过跟map一样的操作,为每一条输入返回一个迭代器(可迭代的数据类型),然后将所得到的不同级别的迭代器中的元素全部当成同级别的元素,返回一个元素级别全部相同的RDD。
这个转换操作通常用来切分单词。
take(N)方法用于获取RDD的前N个元素,返回数据为数组。
take()与collect()方法的原理相似,collect()方法用于获取全部数据,take()方法获取指定个数的数据。
获取RDD的前5个元素
union()方法是一种转换操作,用于将两个RDD合并成一个,不进行去重操作,而且两个RDD中每个元素中的值的个数、数据类型需要保持一致。
使用union()方法合并两个RDD
filter()方法是一种转换操作,用于过滤RDD中的元素。
filter()方法需要一个参数,这个参数是一个用于过滤的函数,该函数的返回值为Boolean类型。
filter()方法将返回值为true的元素保留,将返回值为false的元素过滤掉,最后返回一个存储符合过滤条件的所有元素的新RDD。
创建一个RDD,并且过滤掉每个元组第二个值小于等于1的元素。
distinct()方法是一种转换操作,用于RDD的数据去重,去除两个完全相同的元素,没有参数。
创建一个带有重复数据的RDD,并使用distinct()方法去重。
intersection()方法用于求出两个RDD的共同元素,即找出两个RDD的交集,参数是另一个RDD,先后顺序与结果无关。
创建两个RDD,其中有相同的元素,通过intersection()方法求出两个RDD的交集。
subtract()方法用于将前一个RDD中在后一个RDD出现的元素删除,可以认为是求补集的操作,返回值为前一个RDD去除与后一个RDD相同元素后的剩余值所组成的新的RDD。两个RDD的顺序会影响结果。
创建两个RDD,分别为rdd1和rdd2,包含相同元素和不同元素,通过subtract()方法求rdd1和rdd2彼此的补集。
cartesian()方法可将两个集合的元素两两组合成一组,即求笛卡儿积。
创建两个RDD,分别有4个元素,通过cartesian()方法求两个RDD的笛卡儿积。
键值对RDD,包含键和值两个部分。
Spark提供了两种方法,分别获取键值对RDD的键和值。
keys方法返回一个仅包含键的RDD。
values方法返回一个仅包含值的RDD。
reduceByKey()方法用于合并具有相同键的值,作用对象是键值对,并且只对每个键的值进行处理,当RDD中有多个键相同的键值对时,则会对每个键对应的值进行处理。
reduceByKey()方法需要接收一个输入函数,键值对RDD相同键的值会根据函数进行合并并且创建一个新的RDD作为返回结果。
groupByKey()方法用于对具有相同键的值进行分组,可以对同一组的数据进行计数、求和等操作。对于一个由类型K的键和类型V的值组成的RDD,通过groupByKey()方法得到的RDD类型是[K,Iterable[V]]。
将有键的一组数据与另一组有键的数据根据键进行连接,是对键值对数据常用的操作之一。
与合并不同,连接会对键相同的值进行合并,连接方式多种多样,包含内连接、右外连接、左外连接、全外连接,不同的连接方式需要使用不同的连接方法。
join()方法用于根据键对两个RDD进行内连接,将两个RDD中键相同的数据的值存放在一个元组中,最后只返回两个RDD中都存在的键的连接结果。
rightOuterJoin()方法用于根据键对两个RDD进行右外连接,连接结果是右边RDD的所有键的连接结果,不管这些键在左边RDD中是否存在。
在rightOuterJoin()方法中,如果在左边RDD中有对应的键,那么连接结果中值显示为Some类型值;如果没有,那么显示为None值。
leftOuterJoin()方法用于根据键对两个RDD进行左外连接,与rightOuterJoin()方法相反,返回结果保留左边RDD的所有键。
fullOuterJoin()方法用于对两个RDD进行全外连接,保留两个RDD中所有键的连接结果。
zip()方法用于将两个RDD组合成键值对RDD,要求两个RDD的分区数量以及元素数量相同,否则会抛出异常。
将两个RDD组合成Key/Value形式的RDD,这里要求两个RDD的partition数量以及元素数量都相同,否则会抛出异常。
combineByKey()方法是Spark中一个比较核心的高级方法,键值对的其他一些高级方法底层均是使用combineByKey()方法实现的,如groupByKey()方法、reduceByKey()方法等。
combineByKey()方法用于将键相同的数据聚合,并且允许返回类型与输入数据的类型不同的返回值。
combineByKey()方法的使用方式如下。
combineByKey(createCombiner,mergeValue,mergeCombiners,numPartitions=None)
combineByKey()方法接收3个重要的参数,具体说明如下。
createCombiner:V=>C,V是键值对RDD中的值部分,将该值转换为另一种类型的值C,C会作为每一个键的累加器的初始值。
mergeValue:(C,V)=>C,该函数将元素V聚合到之前的元素C(createCombiner)上(这个操作在每个分区内进行)。
mergeCombiners:(C,C)=>C,该函数将两个元素C进行合并(这个操作在不同分区间进行)。
由于合并操作会遍历分区中所有的元素,因此每个元素(这里指的是键值对)的键只有两种情况:以前没出现过或以前出现过。对于这两种情况,3个参数的执行情况描述如下。
如果以前没出现过,则执行的是createCombiner()方法,createCombiner()方法会在新遇到的键对应的累加器中赋予初始值,否则执行mergeValue()方法。
对于已经出现过的键,调用mergeValue()方法进行合并操作,对该键的累加器对应的当前值(C)与新值(V)进行合并。
由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法对各个分区的结果(全是C)进行合并。
lookup(key:K)方法作用于键值对RDD,返回指定键的所有值。
1、文本文件的读取
通过textFile()方法即可直接读取,一条记录(一行)作为一个元素。
2、文本文件的存储
RDD数据可以直接调用saveAsTextFile()方法将数据存储为文本文件。
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象结构叫做DataFrame的数据模型(即带有Schema信息的RDD),Spark SQL作为分布式SQL查询引擎,让用户可以通过SQL、DataFrames API和Datasets API三种方式实现对结构化数据的处理。
DataFrame可以看作是分布式的Row对象的集合,在二维表数据集的每一列都带有名称和类型,这就是Schema元信息,这使得Spark框架可获取更多数据结构信息,从而对在DataFrame背后的数据源以及作用于DataFrame之上数据变换进行针对性的优化,最终达到提升计算效率。
我们通过Spark读取数据源的方式进行创建DataFrame
查看DataFrame中的具体内容信息
对指定字段进行特殊处理(替换)
查看DataFrame中获取指定字段
实现条件查询,过滤出想要的结果
对记录进行分组
对记录进行分组(desc:降序;asc:升序)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。