赞
踩
1.RDD与DataFrame转换
(1)toDF方法:将RDD转换为DataFrame;
- ## 创建RDD
- val rdd: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1, "ww", 20), (2, "ss", 30), (3, "xx", 40)))
- ## 指定列名
- val df: DataFrame = rdd.toDF("id", "name", "age")
- ## 不指定列名
- val df1: DataFrame = rdd.toDF()
- ## 展示
- df.show()
- df1.show()
(2)rdd方法:将DataFrame转换为RDD。
- val rowRDD: RDD[Row] = df.rdd
- ## 输出
- rowRDD.collect().foreach(println)
2.DataFrame与DataSet转换
(1)as方法:将DataFrame转换为DataSet,使用 as[]
方法时需要指明数据类型或者采用样例类的方式;
- ## 引入隐式转换
- import spark.implicits._
- ## 创建样例类(不能创建于main方法中)
- case class User(id:Int,name:String,age:Int)
- ## 指定数据类型
- val ds: Dataset[(Int,String,Int)] = df.as[(Int,String,Int)]
- ## 采用样例类
- val ds1: Dataset[User] = df.as[User]
- ## 展示
- ds.show()
- ds1.show()
(2)toDF方法:将DataSet转换为DataFrame。
- ## 转换
- val df2: DataFrame = ds.toDF()
- ## 展示
- df2.show()
3.RDD与DataSet转换
(1)toDS方法:将RDD转换为DataSet,使用 toDS()
方法时可以先将数据包装为样例类的形式也可以直接以数据类型输出;
- ## 通过case将样例类User与数据进行匹配
- val ds2: Dataset[User] = rdd.map {
- case (id, name, age) => {
- User(id, name, age)
- }
- }.toDS()
- ## 直接转换
- val ds3: Dataset[(Int, String, Int)]rdd.toDS()
- ## 展示
- ds2.show()
- ds3.show()
(2)rdd方法:将DataSet转换为RDD
- ## 转换
- val userRDD: RDD[User] = ds1.rdd
- ## 输出
- userRDD.collect().foreach(println)
DD 转换成 DataFrame、Dataset: 1、读取list
数据创建 RDD; 2、将 RDD转换为 DataFrame,并指定列名为("id","name","sex","age")
; 3、将 RDD转换为 DataSet,并以样例类的方式转换。
DataFrame 转换成 RDD、DataSet: 1、读取staff.josn
文件创建 DataFrame; 2、将 DataFrame转换为 RDD; 3、将 DataFrame转换为 DataSet。
DataSet 转换成 RDD、DataFrame: 1、读取staff2.json
文件创建 DataSet,并以Staff
样例类的方法创建; 2、将 DataSet转换为 DataFrame; 3、将 DataSet转换为 RDD。
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, sql}
- import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
- object sparkSql_transform {
-
- case class Message()
- def main(args: Array[String]): Unit = {
-
- val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
- val spark =SparkSession.builder().config(sparkConf).getOrCreate()
- import spark.implicits._
-
- val list = List((202201, "Mark", "female", 23), (202202, "Peter", "female", 24), (202203, "Anna", "male", 21))
-
- val path1 = "/data/workspace/myshixun/step1/data/staff.json"
- val path2 = "/data/workspace/myshixun/step1/data/staff2.json"
-
- /********* Begin *********/
-
- /********* RDD 转换成 DataFrame、DataSet *********/
- // 读取list数据创建RDD
- val rdd:RDD[(Int,String,String,Int)]=spark.sparkContext.makeRDD(list)
-
- // 将RDD转换为DataFrame,指定列名为("id","name","sex","age"),并打印输出
- val df:DataFrame=rdd.toDF("id","name","sex","age")
- df.show()
-
- // 将RDD转换为DataSet,以样例类的方式转换,并打印输出
- val ds=rdd.map{line=>Staff(line._1,line._2,line._3,line._4)}.toDS()
- ds.show()
-
- /********* DataFrame 转换成 RDD、DataSet *********/
-
- // 读取staff.josn文件创建DataFrame
- val df1: DataFrame = spark.read.json(path1)
-
- // 将DataFrame转换为RDD,并打印输出
- val rdd1=df1.rdd
- rdd1.collect().foreach(println)
-
- // 将DataFrame转换为DataSet,并打印输出
- val ds1=df1.as[Staff]
- ds1.show()
-
- /********* DataSet 转换成 RDD、DataFrame *********/
- // 读取staff2.json文件创建DataSet,并以Staff样例类的方法创建
- val ds2: Dataset[Staff] = spark.read.json(path2).as[Staff]
-
- // 将DataSet转换为DataFrame,并打印输出
- val df2=ds2.toDF
- df2.show()
-
- // 将DataSet转换为RDD,并打印输出
- val rdd2=ds2.rdd
- rdd2.collect().foreach(println)
-
- /********* End *********/
-
- // TODO 关闭环境
- spark.close()
-
- }
- // Staff样例类
- case class Staff(id: BigInt,name: String,sex: String,age: BigInt)
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。