当前位置:   article > 正文

从RDD创建DataFrame(Sparksql)_1.从rdd创建dataframe, rdd中元素类型分别为row和自定义类。 2.分别从hdfs中

1.从rdd创建dataframe, rdd中元素类型分别为row和自定义类。 2.分别从hdfs中csv

如有侵权请联系,交流QQ:2499496272

Spark SQL中SparkSession是创建DataFrames和执行SQL的入口创建DataFrames有三种方式:
(1) 从一个已存在的RDD进行转换
(2) 从JSON/Parquet/CSV/JDBC等结构化数据源进行创建
(3) 从Hive Table进行查询返回

核心要义:创建DataFrame,需要创建 “RDD + 元信息schema定义”rdd来自于数据schema则可以由开发人员定义,或者由框架从数据中推断

从RDD[Case class类]创建DataFrame

package day01

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
 * 通过RDD创建dataframe的方式1: 把rdd[T]变成 RDD[case class类型]就可以直接toDF
 */

case class Stu(id:Int,name:String,age:Int,city:String,score:Double)

object Demo2_CreateDF_1 {

  Logger.getLogger("org").setLevel(Level.WARN)
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .appName(this.getClass.getSimpleName)
      .master("local[*]")
      .getOrCreate()
    // 加载不含schema信息的数据为一个RDD
   val rdd: RDD[String] = spark.sparkContext.textFile("data/doit_stu.csv")
    import  spark.implicits._
    // 将rdd[String]转成rdd[Stu]
    val rddStu: RDD[Stu] = rdd
      .map(line => line.split(","))
      .map(arr => Stu(arr(0).toInt, arr(1), arr(2).toInt, arr(3), arr(4).toDouble))

    // 然后就可以将这种装着case class类型数据的RDD直接转成dataframe了
    val res: DataFrame = spark.createDataFrame(rddStu)
    res.printSchema()
    res.show(50,false)

    // 另一种更简洁的方式:需要导入隐式转换:  import spark.implicits._
    // 也是更常用的方式
    val df2: DataFrame = rddStu.toDF()

    spark.close()

  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

从RDD[Tuple]创建DataFrame

package day01

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 通过RDD[tuple]创建dataframe
 */
object Demo3_CreateDF_2 {
  Logger.getLogger("org").setLevel(Level.WARN)

  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .appName(this.getClass.getSimpleName)
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    val rdd: RDD[String] = spark.sparkContext.textFile("data/doit_stu.csv")

    // 将RDD[String] 变成RDD[(f1,f2,f3,....)]

    val addTuple: RDD[(Int, String, Int, String, Double)] = rdd.map(_.split(",")).map(arr => (arr(0).toInt, arr(1), arr(2).toInt, arr(3), arr(4).toDouble))

    // 将RDD[(f1,f2,f3,....)] 转换成dataframe即可,框架能从tuple中自动推断出结构
    // val df = addTuple.toDF()
    /**
     * root
     * |-- _1: integer (nullable = false)
     * |-- _2: string (nullable = true)
     * |-- _3: integer (nullable = false)
     * |-- _4: string (nullable = true)
     * |-- _5: double (nullable = false)
     */
    val df: DataFrame = addTuple.toDF("id", "name", "age", "city", "score")


    df.printSchema()
    df.show(50, false)

    spark.close()

  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49

从RDD[JavaBean]创建DataFrame

package day01

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 通过RDD[JavaBean]创建dataframe
 *  什么是javabean?
 *  就是一种类:
 *  类型 字段1
 *  类型 字段2
 *  get
 *  set
 */
object Demo4_CreateDF_3 {
  def main(args: Array[String]): Unit = {

    Logger.getLogger("org").setLevel(Level.WARN)

    val spark = SparkSession
      .builder()
      .appName(this.getClass.getSimpleName)
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._


    val rdd: RDD[String] = spark.sparkContext.textFile("data/doit_stu.csv")

    // RDD[String] 变成  RDD[JavaStu]
    val rddJavaStu: RDD[JavaStu] = rdd.map(line => {

      val arr = line.split(",")

      new JavaStu(arr(0).toInt, arr(1), arr(2).toInt, arr(3), arr(4).toDouble)

    })


    // 将 RDD[JavaStu] 变成 dataframe
    val df = spark.createDataFrame(rddJavaStu, classOf[JavaStu])

    df.printSchema()
    df.show()

    spark.close()
  }
}
------------------------------
package day01;

public class JavaStu {
    int id;
    String name;
    int age;
    String city;
    double score;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    public double getScore() {
        return score;
    }

    public void setScore(double score) {
        this.score = score;
    }

    public JavaStu(int id, String name, int age, String city, double score) {
        this.id = id;
        this.name = name;
        this.age = age;
        this.city = city;
        this.score = score;
    }

    public JavaStu() {
    }

    @Override
    public String toString() {
        return "JavaStu{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", age=" + age +
                ", city='" + city + '\'' +
                ", score=" + score +
                '}';
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122

从RDD[普通Scala类]中创建DataFrame

注:此处的普通类指的是scala中定义的非case class的类框架在底层将其视作java定义的标准bean类型来处理而scala中定义的普通bean类,不具备字段的java标准getters和setters,因而会处理失败可以如下处理来解决

package day01

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 通过RDD[scala bean] 创建dataframe
 */
object Demo5_CreateDF_4 {
  def main(args: Array[String]): Unit = {

    Logger.getLogger("org").setLevel(Level.WARN)

    val spark = SparkSession
      .builder()
      .appName(this.getClass.getSimpleName)
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._


    val rdd: RDD[String] = spark.sparkContext.textFile("data/doit_stu.csv")

    val rddScalaStu: RDD[ScalaStu] = rdd.map(
      line => {
        val arr: Array[String] = line.split(",")
        new ScalaStu(arr(0).toInt, arr(1), arr(2).toInt, arr(3), arr(4).toDouble)

      }
    )
    val df: DataFrame = spark.createDataFrame(rddScalaStu,classOf[ScalaStu])
    df.printSchema()
    df.show(50,false)
    spark.close()
  }

}
----------------
package day01

import scala.beans.BeanProperty

class ScalaStu (
               @BeanProperty
               val id : Int ,
               @BeanProperty
               val name : String ,
               @BeanProperty
               val age : Int ,
               @BeanProperty
               val city : String,
               @BeanProperty
               val score : Double
               )
object  ScalaStu{
  def apply(id: Int, name: String, age: Int, city: String, score: Double): ScalaStu = new ScalaStu(id, name, age, city, score)
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

从RDD[Row]中创建DataFrame

注:DataFrame中的数据,本质上还是封装在RDD中,而RDD[ T ]总有一个T类型,DataFrame内部的RDD中的元素类型T即为框架所定义的Row类型;

package day01

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**
 * 通过 RDD[Row] 来创建dataframe
 *   Row是sparksql中对数据的统一封装形式(内置定义好的一个数据封装类)
 *   任何数据要变成dataframe,都得先把数据变成Row
 */
object Demo6_CreateDF_5 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)

    val spark = SparkSession
      .builder()
      .appName(this.getClass.getSimpleName)
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._


    val rdd: RDD[String] = spark.sparkContext.textFile("data/doit_stu.csv")

    // RDD[String]  变成 RDD[Row]
    val rdd2: RDD[Row] = rdd
      .map(line => {
        val arr: Array[String] = line.split(",")
        Row(arr(0).toInt, arr(1), arr(2).toInt, arr(3), arr(4).toDouble)
      })
    //构造一个数据描述(表定义)
    val schema: StructType = new StructType(Array(
      new StructField("id", DataTypes.IntegerType),
      new StructField("name", DataTypes.StringType),
      new StructField("age", DataTypes.IntegerType),
      new StructField("city", DataTypes.StringType),
      new StructField("score", DataTypes.DoubleType)
    ))
    // 用RDD[Row] + 数据描述  ==》 构造一个dataframe
    val df: DataFrame = spark.createDataFrame(rdd2,schema)
    df.printSchema()
    df.show()
    spark.close()

  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

seq,map,set

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object Demo7_CreateDF_SetSeqMap {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkUtil.getSpark()
    val seq1: Seq[Int] = Seq(1,2,3,4)
    val seq2 = Seq(11,22,33,44)
    /**
     * seq类型数据rdd的编解码
     */
    val rdd: RDD[Seq[Int]] = spark.sparkContext.parallelize(List(seq1,seq2))
    import  spark.implicits._
    val df = rdd.toDF()
    df.printSchema()
    df.show()
    df.selectExpr("value[0]","size(value)").show()
    /**
     * root
     * |-- value: array (nullable = true)
     * |    |-- element: integer (containsNull = false)
     *
     *
     * +--------+-----------+
     * |value[0]|size(value)|
     * +--------+-----------+
     * |       1|          4|
     * |      11|          4|
     * +--------+-----------+
     */

    /**
     * set类型数据rdd的编解码
     */

    val set1: Set[String] = Set("a","b")
    val set2 = Set("c","d","e")
    val rdd1: RDD[Set[String]] = spark.sparkContext.parallelize(List(set1,set2))
    val df2 = rdd1.toDF()
    df2.printSchema()
    df2.show()

    /**
     * map类型数据rdd的编解码
     */
    val map1 = Map("father"->"mayun","mother"->"tangyan")
    val map2 = Map("father"->"huateng","mother"->"yifei","brother"->"sicong")
    val rdd3: RDD[Map[String, String]] = spark.sparkContext.parallelize(List(map1,map2))
    val df3: DataFrame = rdd3.toDF("jiaren")
    df3.printSchema()

    /**
     * root
     * |-- jiaren: map (nullable = true)
     * |    |-- key: string
     * |    |-- value: string (valueContainsNull = true)
     */
    df3.show()
    /***
     * +--------------------+
     * |              jiaren|
     * +--------------------+
     * |[father -> mayun,...|
     * |[father -> huaten...|
     * +--------------------+
     */
    df3.selectExpr("jiaren['mother']","size(jiaren)","map_keys(jiaren)","map_values(jiaren)").show(10,false)

    /**
     * +--------------+------------+-------------------------+------------------------+
     * |jiaren[mother]|size(jiaren)|map_keys(jiaren)         |map_values(jiaren)      |
     * +--------------+------------+-------------------------+------------------------+
     * |tangyan       |2           |[father, mother]         |[mayun, tangyan]        |
     * |yifei         |3           |[father, mother, brother]|[huateng, yifei, sicong]|
     * +--------------+------------+-------------------------+------------------------+
     */
    spark.close()
  }

}

-------
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object SparkUtil {
  def getSpark(app:String = "demo", master : String = "local[*]") : SparkSession = {
    Logger.getLogger("org").setLevel(Level.WARN)

    val spark: SparkSession = SparkSession
      .builder()
      .appName(app)
      .master(master)
      .getOrCreate()
    spark

  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/693325
推荐阅读
相关标签
  

闽ICP备14008679号