当前位置:   article > 正文

Spark笔记一:简介_flatmap(func)与map(func)相似,应用于(key,value)键值对的数据集时,返

flatmap(func)与map(func)相似,应用于(key,value)键值对的数据集时,返回一个新的(

创建:RDD = sc.parallelize(data) or sc.textFile(file path)
RDD = sc.textFile(json file path).map(lambda s : json.loads(s))

RDD基本操作:

Transformation:

  • filter(func):筛选出满足函数func的元素,并返回一个新的数据集
  • map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
  • flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
  • union(otherDataset):返回新的数据集,包括原数据集和参数数据集的所有元素
  • intersection(otherDataset):返回新数据集,是两个集的交集
  • distinct([numTasks]):返回新的集,包括原集中的不重复元素

针对键值对RDD

  • groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集
  • reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合
  • keys(): 只会把键值对RDD中的key返回形成一个新的RDD
  • values(): 只会把键值对RDD中的value返回形成一个新的RDD
  • sortByKey(): 功能是返回一个根据键排序的RDD
  • mapValues(func): 对键值对RDD的value部分进行处理,而不是同时对key和value进行处理。返回形成一个新的RDD.
  • join(RDD): 包括内连接(join)、左外连接(leftOuterJoin)、右外连接(rightOuterJoin)等, 对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。
  • aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) :用于键值对RDD时返回(K,U)对集,对每一个Key的value进行聚集计算
  • cogroup(otherDataset, [numTasks]) : 用于两个键值对RDD时返回(K, (V迭代器, W迭代器))RDD
    Action:
  • count() 返回数据集中的元素个数
  • collect() 以数组的形式返回数据集中的所有元素
  • first() 返回数据集中的第一个元素
  • take(n) 以数组的形式返回数据集中的前n个元素
  • reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
  • foreach(func) 将数据集中的每个元素传递到函数func中运行
  • takeOrder(n, [ordering]) | 返回排序后的前n个元素
  • saveAsTextFile(path) | 将数据集的元素写成文本文件
  • saveAsSequenceFile(path) | 将数据集的元素写成序列文件,这个API只能用于Java和Scala程序
  • saveAsObjectFile(path) | 将数据集的元素使用Java的序列化特性写到文件中,这个API只能用于Java和Scala程序
  • countByCount() | 只能用于键值对RDD,返回一个(K, int) hashmap,返回每个key的出现次数
    持久化:
  • RDD.persist(MEMORY_ONLY, MEMORY_AND_DISK)
  • RDD.cache()
  • RDD.unpersist()

rdd = sc.parallelize(data,2)设置2个分区
广播变量broadcastvar = sc.broadcast([1, 2, 3])

DataFrame

DataFrame创建:读取文件创建

>>> spark=SparkSession.builder.getOrCreate()
>>> df = **spark.read.json**("file:///usr/local/spark/examples/src/main/resources/people.json")
>>> df.show()
`>>> df.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
 
// 选择多列
>>> df.select(df.name,df.age + 1).show()
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+
// 条件过滤
>>> df.filter(df.age > 20).show()
// 分组聚合
>>> df.groupBy("age").count().show()
// 排序
>>> df.sort(df.age.desc()).show()
//多列排序
>>> df.sort(df.age.desc(), df.name.asc()).show()
//对列进行重命名
>>> df.select(df.name.alias("username"),df.age).show()``
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

DataFrame创建:RDD转换—Spark官网提供了两种方法来实现从RDD转换得到DataFrame,第一种方法是,利用映射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换;第二种方法是,使用编程接口,构造一个schema并将其应用在已知的RDD上。

利用映射机制推断RDD模式:会用到toDF()方法

>>> from pyspark.sql.types import Row
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/819803
推荐阅读
相关标签
  

闽ICP备14008679号