当前位置:   article > 正文

scala spark 创建DataFrame的五种方式_scala dataframe

scala dataframe

scala spark 创建DataFrame的多种方式

1. 通过RDD[Row]和StructType创建

  1. import org.apache.log4j.{Level, Logger}
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
  4. import org.apache.spark.sql.{DataFrame, Row, SparkSession}
  5. /**
  6. *通过RDD[Row]和StructType创建DataFrame
  7. */
  8. object DataFrameDemo {
  9. def main(args: Array[String]): Unit = {
  10. Logger.getLogger("org").setLevel(Level.OFF)
  11. // 通过SparkSession创建spark入口
  12. val spark: SparkSession = SparkSession.builder()
  13. .appName(this.getClass.getSimpleName)
  14. .master("local[*]")
  15. .getOrCreate()
  16. println("1. 通过RDD[Row]和StructType创建")
  17. //1. 通过RDD[Row]和StructType创建
  18. val sparkRdd1: RDD[String] = spark.sparkContext.parallelize(List("X,22,M", "y,21,W", "N,22,M"))
  19. // 将RDD与Row联合
  20. val rowRdd: RDD[Row] = sparkRdd1.map(t => {
  21. val per: Array[String] = t.split(",")
  22. Row(per(0), per(1).toInt, per(2))
  23. })
  24. // 创建StructType实例,设置字段名和类型
  25. val schema: StructType = StructType(
  26. List(
  27. StructField("name", StringType),
  28. StructField("age", IntegerType),
  29. StructField("sex", StringType)
  30. )
  31. )
  32. // 创建dataFrame
  33. val dataFrame: DataFrame = spark.createDataFrame(rowRdd, schema)
  34. // 展示数据
  35. dataFrame.show()
  36. // 释放资源
  37. spark.stop()
  38. }
  39. }

输出结果

  1. 1. 通过RDD[Row]和StructType创建
  2. +----+---+---+
  3. |name|age|sex|
  4. +----+---+---+
  5. | X| 22| M|
  6. | y| 21| W|
  7. | N| 22| M|
  8. +----+---+---+

2. 直接通过Rdd.toDF()创建DataFrame

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.sql.{DataFrame, SparkSession}
  3. /**
  4. * 通过RDD.toDF直接创建DataFrame
  5. */
  6. object DataFrameDemo2 {
  7. def main(args: Array[String]): Unit = {
  8. Logger.getLogger("org").setLevel(Level.OFF)
  9. // 通过SparkSession创建spark入口
  10. val spark: SparkSession = SparkSession.builder()
  11. .appName(this.getClass.getSimpleName)
  12. .master("local[*]")
  13. .getOrCreate()
  14. println("2. 直接通过Rdd.toDF()创建DataFrame")
  15. // 创建RDD
  16. val sparkRdd3: RDD[String] = spark.sparkContext.parallelize(List("X,22,M", "y,21,W", "N,22,M"))
  17. // 将RDD与Row联合
  18. val toDFRdd: RDD[(String, Int, String)] = sparkRdd3.map(t => {
  19. val per: Array[String] = t.split(",")
  20. (per(0), per(1).toInt, per(2))
  21. })
  22. import org.apache.spark.sql.functions._
  23. import spark.implicits._
  24. // 创建DataFrame,设置schema字段名
  25. //写法1
  26. //val frame: DataFrame = toDFRdd.toDF("name", "age", "sex")
  27. //写法2
  28. val array =Array("name", "age", "sex")
  29. val frame: DataFrame = toDFRdd.toDF(array: _*)
  30. // 展示数据
  31. frame.agg(sum("age") as "avg_age").show()
  32. // 释放资源
  33. spark.stop()
  34. }
  35. }

输出结果

  1. 2. 直接通过Rdd.toDF()创建DataFrame
  2. +-------+
  3. |avg_age|
  4. +-------+
  5. | 65|
  6. +-------+

3. 通过RDD和ScalaBean创建DataFrame

  1. import org.apache.log4j.{Level, Logger}
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.sql.{DataFrame, SparkSession}
  4. /**
  5. * 通过RDD和scalabean创建DataFrame
  6. */
  7. object DataFrameDemo3 {
  8. def main(args: Array[String]): Unit = {
  9. Logger.getLogger("org").setLevel(Level.OFF)
  10. // 通过SparkSession创建spark入口
  11. val spark: SparkSession = SparkSession.builder()
  12. .appName(this.getClass.getSimpleName)
  13. .master("local[*]")
  14. .getOrCreate()
  15. println("3. 通过RDD和ScalaBean创建DataFrame")
  16. val sparkRdd: RDD[String] = spark.sparkContext.parallelize(List("X,22,M", "y,21,W", "N,22,M"))
  17. // 关联SacalaBean的RDD
  18. val beanRdd: RDD[Per] = sparkRdd.map(t => {
  19. val per: Array[String] = t.split(",")
  20. Per(per(0), per(1).toInt, per(2))
  21. })
  22. // 必须导入隐式转换才能使用.toDF
  23. import spark.implicits._
  24. // 创建dataFrame
  25. val df: DataFrame = beanRdd.toDF
  26. // 创建视图
  27. df.createTempView("t_per")
  28. // 查询数据
  29. val res: DataFrame = spark.sql("SELECT name,age FROM t_per ORDER BY age")
  30. // 展示数据
  31. res.show()
  32. // 释放资源
  33. spark.stop()
  34. }
  35. //ScalaBean
  36. case class Per(name: String, age: Int, sex: String)
  37. }

输出结果

  1. 3. 通过RDD和ScalaBean创建DataFrame
  2. +----+---+
  3. |name|age|
  4. +----+---+
  5. | y| 21|
  6. | N| 22|
  7. | X| 22|
  8. +----+---+

4. 通过RDD联合JavaBean创建DataFrame

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.sql.{DataFrame, SparkSession}
  3. /**
  4. * 通过RDD和javabean创建DataFrame
  5. */
  6. object DataFrameDemo4 {
  7. def main(args: Array[String]): Unit = {
  8. Logger.getLogger("org").setLevel(Level.OFF)
  9. // 通过SparkSession创建spark入口
  10. val spark: SparkSession = SparkSession.builder()
  11. .appName(this.getClass.getSimpleName)
  12. .master("local[*]")
  13. .getOrCreate()
  14. println("4. 通过RDD联合JavaBean创建DataFrame")
  15. // 创建RDD
  16. val sparkRdd4: RDD[String] = spark.sparkContext.parallelize(List("X,22,M", "y,21,W", "N,22,M"))
  17. // 将RDD与Row联合
  18. val javaBeanRdd: RDD[Person] = sparkRdd4.map(t => {
  19. val per: Array[String] = t.split(",")
  20. new Person(per(0), per(1).toInt, per(2))
  21. })
  22. // 创建DataFrame
  23. val frame1: DataFrame = spark.createDataFrame(javaBeanRdd, classOf[Person])
  24. // 展示数据
  25. frame1.show()
  26. // 释放资源
  27. spark.stop()
  28. }
  29. }

Java bean class

  1. public class Person {
  2. private String name;
  3. private int age;
  4. private String sex;
  5. public Person(String name, int age, String sex) {
  6. this.name = name;
  7. this.age = age;
  8. this.sex = sex;
  9. }
  10. public String getName() {
  11. return name;
  12. }
  13. public void setName(String name) {
  14. this.name = name;
  15. }
  16. public int getAge() {
  17. return age;
  18. }
  19. public void setAge(int age) {
  20. this.age = age;
  21. }
  22. public String getSex() {
  23. return sex;
  24. }
  25. public void setSex(String sex) {
  26. this.sex = sex;
  27. }
  28. }

输出结果

  1. 4. 通过RDD联合JavaBean创建DataFrame
  2. +---+----+---+
  3. |age|name|sex|
  4. +---+----+---+
  5. | 22| X| M|
  6. | 21| y| W|
  7. | 22| N| M|
  8. +---+----+---+

5. 手写的通过RDD和javabean创建DataFrame

  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.sql.{DataFrame, SparkSession}
  3. /**
  4. * 通过RDD和javabean创建DataFrame
  5. */
  6. object DataFrameDemo5 {
  7. def main(args: Array[String]): Unit = {
  8. Logger.getLogger("org").setLevel(Level.OFF)
  9. // 通过SparkSession创建spark入口
  10. val spark: SparkSession = SparkSession.builder()
  11. .appName(this.getClass.getSimpleName)
  12. .master("local[*]")
  13. .getOrCreate()
  14. println("5、手写的示例5")
  15. val rdd11 = spark.sparkContext.parallelize(Seq(
  16. Row(8, "bat"),
  17. Row(64, "mouse"),
  18. Row(-27, "horse")
  19. ))
  20. rdd11.foreach(println)
  21. // 创建
  22. val newRDD: RDD[NumberWord]= rdd11.map(x=>{
  23. val rdto = convertNumberWordDTO(x.getInt(0)+100,x.getString(1))
  24. rdto
  25. })
  26. val ds = spark.createDataFrame(newRDD, classOf[NumberWord])
  27. ds.show(100,false)
  28. println(ds.schema) //StructType(StructField(number,IntegerType,false), StructField(word,StringType,true))
  29. //调用 someDF.schema.printTreeString()得到someDF的schema:
  30. ds.schema.printTreeString()
  31. //root
  32. // |-- number: integer (nullable = false)
  33. // |-- word: string (nullable = true)
  34. // 释放资源
  35. spark.stop()
  36. }
  37. def convertNumberWordDTO(number: Int,word:String) ={
  38. val rdto = new NumberWord()
  39. rdto.setNumber(number)
  40. rdto.setWord(word)
  41. rdto
  42. }
  43. }

Java bean class

  1. public class NumberWord {
  2. private int number;
  3. private String word;
  4. public NumberWord() {
  5. }
  6. public NumberWord(int number, String word) {
  7. this.number = number;
  8. this.word = word;
  9. }
  10. public int getNumber() {
  11. return number;
  12. }
  13. public void setNumber(int number) {
  14. this.number = number;
  15. }
  16. public String getWord() {
  17. return word;
  18. }
  19. public void setWord(String word) {
  20. this.word = word;
  21. }
  22. }

输出结果

  1. 5、手写的示例5
  2. [8,bat]
  3. [64,mouse]
  4. [-27,horse]
  5. +------+-----+
  6. |number|word |
  7. +------+-----+
  8. |108 |bat |
  9. |164 |mouse|
  10. |73 |horse|
  11. +------+-----+
  12. StructType(StructField(number,IntegerType,false), StructField(word,StringType,true))
  13. root
  14. |-- number: integer (nullable = false)
  15. |-- word: string (nullable = true)

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/693318
推荐阅读
相关标签
  

闽ICP备14008679号