赞
踩
1.spark1.6
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row, SQLContext} object Demo01 { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() .setAppName(this.getClass.getName) .setMaster("local") val sc: SparkContext = new SparkContext(conf) val sqlContext: SQLContext = new SQLContext(sc) val studentInfo: RDD[String] = sc.textFile("F:\\ideaProjects\\programer_Scala\\src\\main\\resources\\Student") val splitedInfo: RDD[Array[String]] = studentInfo.map(str => str.split(" ")) /** * 方式一 */ val studentRDD: RDD[Student] = splitedInfo.map(arr => Student(arr(0),arr(1).toDouble,arr(2).toDouble)) import sqlContext.implicits._ val personDF1: DataFrame = studentRDD.toDF() personDF1.registerTempTable("t_student1") val result1: DataFrame = sqlContext.sql("select * from t_student1 where math>60") result1.show() result1.write.mode("overwrite").json("F:\\ideaProjects\\programer_Scala\\src\\main\\resources\\student_out1") /** * 方式二 */ val schema = StructType( List( StructField("name", StringType, true), StructField("English", DoubleType, true), StructField("math", DoubleType, true) ) ) val rowRDD: RDD[Row] = splitedInfo.map(p => Row(p(0).trim,p(1).toDouble,p(2).toDouble)) val personDF2: DataFrame = sqlContext.createDataFrame(rowRDD,schema) personDF2.registerTempTable("t_student2") val result2: DataFrame = sqlContext.sql("select * from t_student2 where English>60") result2.show() result2.write.mode("append").json("F:\\ideaProjects\\programer_Scala\\src\\main\\resources\\student_out2") sc.stop() } } case class Student(name:String,English:Double,math:Double)
2.spark2.2.0
import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql._ object RddAndDataFrameAndDataset { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName(this.getClass.getName) .master("local") .getOrCreate() val sc: SparkContext = spark.sparkContext val lineRdd: RDD[String] = sc.textFile("F:\\ideaProjects\\spark-sql\\src\\main\\resources\\booksInfo") val arrayRdd: RDD[Array[String]] = lineRdd.map(line => line.split(" ")) //转换方式一:RDD[] --> Dataset[] val caseClassRdd: RDD[BookAndAuthor] = arrayRdd.map(arr => BookAndAuthor(arr(0),arr(1))) println(caseClassRdd.collect().toBuffer) //导入隐式区域,使用toDS()将RDD[BookAndAuthor]转换成Dataset[BookAndAuthor]类型 import spark.implicits._ //导入隐式区域。这里的spark是SparkSession对象的名字。 val caseClassDataset: Dataset[BookAndAuthor] = caseClassRdd.toDS() //caseClassDataset.show() //转换方式二:RDD[] --> DataFram val tupleRdd: RDD[(String, String)] = arrayRdd.map(arr => (arr(0),arr(1))) //生成元组类型rdd val dataFrame1: DataFrame = tupleRdd.toDF("bookName","author") //添加结构信息,即字段名 //或者 val dataFrame2: DataFrame = caseClassRdd.toDF() //RDD[BookAndAuthor]中已包含数据类型和结构信息,可以直接转 //转换方式三:DataFrame --> Dataset[] //创建一个样例类BookAndAuthor2 val dataset: Dataset[BookAndAuthor2] = dataFrame1.as[BookAndAuthor2] dataset.show() //Dataset[] --> DataFrame --> RDD[Row] val toDF1: DataFrame = dataset.toDF() val rddRow: RDD[Row] = dataFrame1.rdd //每行数据被封装成Row类型 rddRow.foreach(row => { println(row.getString(0) + "===" + row.getString(1)) //对Row类型取值 }) //或者 Dataset[] --> RDD[] val toRdd: RDD[BookAndAuthor2] = dataset.rdd spark.stop() //释放资源 } } case class BookAndAuthor(bookName: String, author: String) case class BookAndAuthor2(bookName: String, author: String)
转换关系图:
3.分组topN
3.1 sparkSQL实现
import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} object GroupAndTopNSql { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName(this.getClass.getName) .master("local[2]") .getOrCreate() val sc = spark.sparkContext val lineRDD = sc.textFile("F:\\ideaProjects\\spark-sql\\src\\main\\resources\\advertisementInfo") // val dataFrame1: DataFrame = spark.read.format("textfile").load("F:\\ideaProjects\\spark-sql\\src\\main\\resources\\advertisementInfo") val splitRDD: RDD[Array[String]] = lineRDD.map(_.split(" ")) val caseclassRDD: RDD[Advertisement] = splitRDD.map(arr => Advertisement(arr(0).toLong, arr(1), arr(2), arr(3), arr(4))) import spark.implicits._ val dataFrame1 = caseclassRDD.toDF() dataFrame1.createTempView("ads_advertisementInfo") val topNSQL = """ |select |provence, |ad, |count |from |( |select |provence, |ad, |count, |row_number() over(partition by provence order by count desc) top |from |( |select provence, ad,count(*) count |from ads_advertisementInfo |group by provence,ad |)a |)b |where b.top=1 """.stripMargin val dataFrame2: DataFrame = spark.sql(topNSQL) dataFrame2.write.format("parquet").mode("overwrite").save("F:\\ideaProjects\\spark-sql\\src\\main\\resources\\out\\advertisementInfo") spark.stop() } } case class Advertisement(timestample: Long, provence: String, city: String, user: String, ad: String)
3.2 sparkCore实现
方式一:
import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession object GroupAndTopN2 { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName(this.getClass.getName) .master("local[2]") .getOrCreate() val sc = spark.sparkContext import spark.implicits._ //时间戳 省份 城市 用户 广告 val lineRDD: RDD[String] = sc.textFile("F:\\ideaProjects\\spark-sql\\src\\main\\resources\\advertisementInfo") //(省_广告,1) val provenceAndAdvertisementRDD: RDD[(String, Long)] = lineRDD.map { line => { val provence = line.split(" ")(1) val advertisement = line.split(" ")(4) val key = provence + "_" + advertisement (key, 1L) } } //(省_广告,点击次数) val provenceAndAdvertiseReduceBykeyRDD: RDD[(String, Long)] = provenceAndAdvertisementRDD.reduceByKey(_+_) val groupByRDD: RDD[(String, Iterable[(String, Long)])] = provenceAndAdvertiseReduceBykeyRDD.groupBy(_._1.split("_")(0)) val topnRDD: RDD[(String, List[(String, Long)])] = groupByRDD.map { x => { val provence = x._1 val list = x._2.toList //sortBy默认为升序,需要对结果reverse实现降序 val descSort = list.sortBy(_._2).reverse.take(2) // val descSort: List[(String, Long)] = list.sortWith(_._2 > _._2).take(1) (provence, descSort) } } println(topnRDD.collect().toBuffer) } }
方式二:
import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession object GroupAndTopN { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName(this.getClass.getName) .master("local[2]") .getOrCreate() val sc = spark.sparkContext import spark.implicits._ //时间戳 省份 城市 用户 广告 val lineRDD: RDD[String] = sc.textFile("F:\\ideaProjects\\spark-sql\\src\\main\\resources\\advertisementInfo") //(省份,广告) val tupleRDD: RDD[(String, String)] = lineRDD.map { line => { val provence = line.split(" ")(1) val advertisement = line.split(" ")(4) (provence, advertisement) } } //(省份,Iterable(广告,1)) val groupRDD: RDD[(String, Iterable[(String, Int)])] = tupleRDD.map { case (provence, advertisement) => (provence, (advertisement, 1)) }.groupByKey() //(省份,Map(广告,累计次数)) val proAndAdCountRDD: RDD[(String, Map[String, Long])] = groupRDD.map { case (provence, iter) => { val groupByAd: Map[String, Iterable[(String, Int)]] = iter.groupBy(_._1) val advertiseCount: Map[String, Long] = groupByAd.map { case (ad, iter) => (ad, iter.size.toLong) } (provence, advertiseCount) } } val topnRDD = proAndAdCountRDD.map { x => { val provence = x._1 val list: List[(String, Long)] = x._2.toList val descSort = list.sortBy(_._2).reverse.take(2) (provence, descSort) } } println(topnRDD.collect().toBuffer) } }
4.DataFrame类型操作
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object DataFrameExamples { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName(this.getClass.getName) .master("local[*]") .getOrCreate() val dataFrame1: DataFrame = spark.read.format("text").load("F:\\ideaProjects\\spark-sql\\src\\main\\resources\\booksInfo") import spark.implicits._ // val dataFrame2: DataFrame = dataFrame1.map(row => { // val bookname = row.getString(0).split(" ")(0) // val author = row.getString(0).split(" ")(1) // (bookname, author) // }).toDF("bookName", "author") // val dataFrame2 = dataFrame1.map { // case row: Row => { // val bookname = row.getString(0).split(" ")(0) // val author = row.getString(0).split(" ")(1) // (bookname, author) // } // }.toDF("bookName", "author") //模式匹配 val dataFrame2 = dataFrame1.map { //DataFrame只有Schema信息,没有数据类型信息,用map操作时需要指定Row中参数类型 case Row(line: String) => { val bookname = line.split(" ")(0) val author = line.split(" ")(1) (bookname,author) } }.toDF("bookName", "author") //返回符合条件的所有列 val dataFrame3 = dataFrame2.where("author='古龙'") dataFrame3.show() spark.stop() } }
5.stripMargin方法获取外界变量
package sparkSQL import org.apache.spark.sql.{DataFrame, SparkSession} object JsonFunctions { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName(this.getClass.getName) .master("local[2]") .getOrCreate() val sc = spark.sparkContext val lineRDD = sc.textFile("F:\\ideaProjects\\spark-version2\\src\\main\\resources\\ods_event_log.txt") import spark.implicits._ val dataFrame1: DataFrame = lineRDD.toDF("line") dataFrame1.createTempView("ods_event_log") val sql01 = """ |select | split(line,'\\|')[0] server_time, | get_json_object(split(line,'\\|')[1],'$.et') event_json |from ods_event_log """.stripMargin spark.sql(sql01).createTempView("ods_event_log_tmp1") val sql02 = """ |select | server_time, | tmp.event_json |from ods_event_log_tmp1 |lateral view explode(split(regexp_replace(regexp_extract(event_json,'^\\[(.+)\\]$',1),'\\}\\,\\{','\\}\\|\\|\\{'),'\\|\\|'))tmp as event_json """.stripMargin spark.sql(sql02).createTempView("ods_event_log_tmp2") val servertime = "1592116043890" //错误写法 val sql03 = """ |select | server_time, | get_json_object(event_json,'$.en') event_name, | event_json |from ods_event_log_tmp2 |where server_time = ${servertime} """.stripMargin //正确写法 val sql04 = s""" |select | server_time, | get_json_object(event_json,'$$.en') event_name, | event_json |from ods_event_log_tmp2 |where server_time = ${servertime} """.stripMargin spark.sql(sql04).show() } }
6.DSL风格–join、row_number()over()
package sparkSQL import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.{Dataset, Row, SparkSession} object JoinTypeDSL { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName(this.getClass.getName) .master("local[2]") .getOrCreate() val scoreDF = spark.read.format("text").load("F:\\ideaProjects\\spark-sql\\src\\main\\resources\\studentScore.txt") val sinfoDF = spark.read.format("text").load("F:\\ideaProjects\\spark-sql\\src\\main\\resources\\studentInfo.txt") val tinfoDF = spark.read.format("text").load("F:\\ideaProjects\\spark-sql\\src\\main\\resources\\teacherInfo.txt") import spark.implicits._ import scala.collection.mutable //学生成绩数据集 val scoreTupleDS: Dataset[(Int, String, mutable.Map[String, Int])] = scoreDF.map { case Row(line: String) => { val map = mutable.Map[String,Int]() val arr = line.split(" ") val snumber = arr(0).toInt val sname = arr(1) val chinese = arr(2).split(":")(0) val chineseScore = arr(2).split(":")(1).toInt val math = arr(3).split(":")(0) val mathScore = arr(3).split(":")(1).toInt val english = arr(4).split(":")(0) val englishScore = arr(4).split(":")(1).toInt val physics = arr(5).split(":")(0) val physicsScore = arr(5).split(":")(1).toInt val chemistry = arr(6).split(":")(0) val chemistryScore = arr(6).split(":")(1).toInt val biology = arr(7).split(":")(0) val biologyScore = arr(7).split(":")(1).toInt map+=(chinese -> chineseScore, math -> mathScore, english -> englishScore, physics -> physicsScore, chemistry -> chemistryScore, biology -> biologyScore) (snumber,sname,map) } } val studentScoreDS: Dataset[(Int, String, String, Int)] = scoreTupleDS.map { case (snumber: Int, sname: String, map: mutable.Map[String, Int]) => { val iter: mutable.Iterable[(Int, String, String, Int)] = map.map { case (subject: String, score: Int) => { (snumber, sname, subject, score) } } iter.toList } }.flatMap(tuple => tuple) val studentScoreDF = studentScoreDS.toDF("snumber","sname","subject","score") import org.apache.spark.sql.functions._ val studentTotalScoreDF = studentScoreDF.groupBy("snumber","sname").agg(sum("score").as("totalScore")) .withColumn("_rank",row_number().over(Window.orderBy(desc("totalScore")))) .selectExpr("snumber", "sname", "totalScore","_rank") //学生信息数据集 val studentInfoDF = sinfoDF.map { case Row(line: String) => { val arr = line.split(" ") val snumber = arr(0) val sname = arr(1) val gender = arr(2) val age = arr(3) val sclass = arr(4) (snumber, sname, gender, age, sclass) } }.toDF("snumber", "sname", "gender", "age", "sclass") //学生成绩关联学生信息 val resultDF01 = studentTotalScoreDF.join(studentInfoDF, Seq("snumber", "sname"), "left") .selectExpr("snumber", "sname", "totalScore", "_rank", "gender", "sclass") //老师信息数据集 val teacherInfoDF = tinfoDF.map { rowLine => { val arr = rowLine.getString(0).split(" ") val tnumber = arr(0) val tname = arr(1) val subject = arr(2) val gender = arr(3) (tnumber, tname, subject, gender) } }.toDF("tnumber", "tname", "subject", "gender") //统计每个班级各学科最高分及老师信息 val studentScoreJoinSinfoDF = studentScoreDF.join(studentInfoDF, studentScoreDF("snumber") === studentInfoDF("snumber") and ( studentScoreDF("sname") === studentInfoDF("sname")), "left") .select(studentScoreDF("snumber"), studentScoreDF("sname"), studentScoreDF("subject"), studentScoreDF("score"), studentInfoDF("gender"), studentInfoDF("age"), studentInfoDF("sclass")) //这种方法输出字段只能是分组的key的元素,snumber、sname等字段不能使用 // studentScoreJoinSinfoDF.groupBy("sclass","subject").agg(max("score") as("max_score")) // .selectExpr("sclass","subject","max_score") //通过开窗函数解决上述问题 val studentScoreMaxDF = studentScoreJoinSinfoDF.withColumn("_rank", row_number() .over(Window.partitionBy("sclass", "subject") .orderBy(desc("score")))) .selectExpr("sclass", "subject", "score", "snumber", "sname", "gender", "age", "_rank") .where("_rank=1") .orderBy(asc("sclass")) .drop("_rank") val resultDF02 = studentScoreMaxDF.join(teacherInfoDF.toDF("tnumber", "tname", "subject", "tgender"), Seq("subject"),"left") .selectExpr("sclass", "subject", "score", "snumber", "sname", "gender", "age","tname","tgender") resultDF01.show() resultDF02.show() } } 测试数据: studentScore.txt 1 张三 语文:88 数学:81 英语:61 物理:77 化学:60 生物:63 2 李四 语文:77 数学:82 英语:62 物理:73 化学:63 生物:62 3 王五 语文:78 数学:73 英语:60 物理:71 化学:66 生物:91 4 赵六 语文:55 数学:84 英语:63 物理:74 化学:67 生物:90 5 小明 语文:54 数学:95 英语:66 物理:67 化学:45 生物:29 6 小红 语文:63 数学:76 英语:67 物理:68 化学:44 生物:28 7 小张 语文:62 数学:57 英语:69 物理:69 化学:53 生物:17 8 小李 语文:91 数学:68 英语:70 物理:77 化学:52 生物:88 9 小吴 语文:90 数学:99 英语:76 物理:74 化学:61 生物:63 10 小周 语文:29 数学:50 英语:80 物理:97 化学:68 生物:66 11 小赵 语文:28 数学:49 英语:83 物理:76 化学:95 生物:67 12 小王 语文:17 数学:30 英语:65 物理:80 化学:76 生物:69 13 小孙 语文:16 数学:21 英语:79 物理:83 化学:57 生物:55 14 小强 语文:45 数学:11 英语:64 物理:65 化学:68 生物:55 15 小丽 语文:44 数学:81 英语:61 物理:79 化学:66 生物:95 16 小花 语文:53 数学:81 英语:50 物理:90 化学:50 生物:76 17 小玉 语文:52 数学:61 英语:55 物理:29 化学:49 生物:57 18 小霞 语文:61 数学:71 英语:58 物理:28 化学:30 生物:68 19 如花 语文:68 数学:88 英语:62 物理:17 化学:21 生物:33 20 似玉 语文:78 数学:66 英语:70 物理:16 化学:11 生物:55 studentInfo.txt 1 张三 男 17 1 2 李四 男 17 1 3 王五 男 18 1 4 赵六 男 19 1 5 小明 男 20 2 6 小红 女 16 2 7 小张 男 15 2 8 小李 男 17 2 9 小吴 男 19 3 10 小周 男 18 3 11 小赵 男 15 3 12 小王 男 21 3 13 小孙 男 19 4 14 小强 男 20 4 15 小丽 女 16 4 16 小花 女 18 4 17 小玉 女 17 5 18 小霞 女 19 5 19 如花 女 16 5 20 似玉 女 18 5 teacherInfo.txt 1 聂老师 语文 女 2 马老师 数学 男 3 杨老师 英语 女 4 金老师 物理 男 5 赵老师 化学 男 6 张老师 生物 男 测试结果: |snumber|sname|totalScore|_rank|gender|sclass| +-------+-----+----------+-----+------+------+ | 9| 小吴| 463| 1| 男| 3| | 8| 小李| 446| 2| 男| 2| | 3| 王五| 439| 3| 男| 1| | 4| 赵六| 433| 4| 男| 1| | 1| 张三| 430| 5| 男| 1| | 15| 小丽| 426| 6| 女| 4| | 2| 李四| 419| 7| 男| 1| | 16| 小花| 400| 8| 女| 4| | 11| 小赵| 398| 9| 男| 3| | 10| 小周| 390| 10| 男| 3| | 5| 小明| 356| 11| 男| 2| | 6| 小红| 346| 12| 女| 2| | 12| 小王| 337| 13| 男| 3| | 7| 小张| 327| 14| 男| 2| | 18| 小霞| 316| 15| 女| 5| | 13| 小孙| 311| 16| 男| 4| | 14| 小强| 308| 17| 男| 4| | 17| 小玉| 303| 18| 女| 5| | 20| 似玉| 296| 19| 女| 5| | 19| 如花| 289| 20| 女| 5| +-------+-----+----------+-----+------+------+ |sclass|subject|score|snumber|sname|gender|age|tname|tgender| +------+-------+-----+-------+-----+------+---+-----+-------+ | 1| 语文| 88| 1| 张三| 男| 17| 聂老师| 女| | 1| 数学| 84| 4| 赵六| 男| 19| 马老师| 男| | 1| 生物| 91| 3| 王五| 男| 18| 张老师| 男| | 1| 化学| 67| 4| 赵六| 男| 19| 赵老师| 男| | 1| 物理| 77| 1| 张三| 男| 17| 金老师| 男| | 1| 英语| 63| 4| 赵六| 男| 19| 杨老师| 女| | 2| 化学| 53| 7| 小张| 男| 15| 赵老师| 男| | 2| 生物| 88| 8| 小李| 男| 17| 张老师| 男| | 2| 物理| 77| 8| 小李| 男| 17| 金老师| 男| | 2| 英语| 70| 8| 小李| 男| 17| 杨老师| 女| | 2| 语文| 91| 8| 小李| 男| 17| 聂老师| 女| | 2| 数学| 95| 5| 小明| 男| 20| 马老师| 男| | 3| 语文| 90| 9| 小吴| 男| 19| 聂老师| 女| | 3| 数学| 99| 9| 小吴| 男| 19| 马老师| 男| | 3| 英语| 83| 11| 小赵| 男| 15| 杨老师| 女| | 3| 生物| 69| 12| 小王| 男| 21| 张老师| 男| | 3| 物理| 97| 10| 小周| 男| 18| 金老师| 男| | 3| 化学| 95| 11| 小赵| 男| 15| 赵老师| 男| | 4| 化学| 68| 14| 小强| 男| 20| 赵老师| 男| | 4| 英语| 79| 13| 小孙| 男| 19| 杨老师| 女| +------+-------+-----+-------+-----+------+---+-----+-------+
RDD和DataFrame相互转换
package sparkSQL.dataFrame import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} object DataFrameV2 { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName(this.getClass.getName) .master("local[*]") .getOrCreate() val sc = spark.sparkContext val rdd01 = sc.textFile("src\\main\\resources\\dataFrame\\ads_recom_theme_user_oper_data_dm.txt") //RDD转换成DataFrame import spark.implicits._ val df01: DataFrame = rdd01.map { line => { val arr = line.split(",") val up_id = arr(0) val oper_id = arr(1) val oper_type = arr(2) val item_id = arr(3) val oper_occur_time = arr(4) (up_id,oper_id,oper_type,item_id,oper_occur_time) } }.toDF("up_id","oper_id","oper_type","item_id","oper_occur_time") df01.show() //DataFrame转换成RDD val rdd02: RDD[Row] = df01.rdd spark.stop() //释放资源 } }
DataFrame和DataSet相互转换
package sparkSQL.dataFrame import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object DataFrameV3 { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName(this.getClass.getName) .master("local[*]") .getOrCreate() val df01: DataFrame = spark.read.json("src\\main\\resources\\dataFrame\\ads_recom_theme_user_oper_data_dm.json") //DataFrame转换成DataSet import spark.implicits._ val ds01: Dataset[UserOper] = df01.as[UserOper] ds01.show() //DataSet转换成DataFrame //DataFrame其实是Dataset的Row类型--type DataFrame = Dataset[Row] val df02: DataFrame = ds01.toDF() val df03: Dataset[Row] = ds01.toDF() df03.show() spark.stop() //释放资源 } } case class UserOper(up_id: String, oper_id: String, oper_type: String, item_id: String, oper_occur_time: String)
RDD和DataSet相互转换
package sparkSQL.dataFrame import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Row, SparkSession} object DataFrameV4 { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName(this.getClass.getName) .master("local[*]") .getOrCreate() val sc = spark.sparkContext val rdd01 = sc.textFile("src\\main\\resources\\dataFrame\\ads_recom_theme_user_oper_data_dm.txt") val rdd02: RDD[UserOperV2] = rdd01.map { line => { val arr = line.split(",") val up_id = arr(0) val oper_id = arr(1) val oper_type = arr(2) val item_id = arr(3) val oper_occur_time = arr(4) UserOperV2(up_id, oper_id, oper_type, item_id, oper_occur_time) } } val rdd03: RDD[(String, String, String, String, String)] = rdd01.map { line => { val arr = line.split(",") val up_id = arr(0) val oper_id = arr(1) val oper_type = arr(2) val item_id = arr(3) val oper_occur_time = arr(4) (up_id, oper_id, oper_type, item_id, oper_occur_time) } } //将RDD转换成RDD[样例类],再转换成Dataset[样例类] import spark.implicits._ val ds01: Dataset[UserOperV2] = rdd02.toDS() ds01.show() //RDD[] 转换成 Dataset[] val ds02: Dataset[(String, String, String, String, String)] = rdd03.toDS() //Dataset转换成RDD[样例类] val rdd04: RDD[UserOperV2] = ds01.rdd spark.stop() //释放资源 } } case class UserOperV2(up_id: String, oper_id: String, oper_type: String, item_id: String, oper_occur_time: String)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。