当前位置:   article > 正文

Spark创建Dataframe的方法_java 方法来将字符串数据转换为rdd,或者使用 createdataframe

java 方法来将字符串数据转换为rdd,或者使用 createdataframe
  1. 通过RDD创建dataframe的方式1: 把rdd[T]变成 RDD[case class类型]就可以直接toDF
  2. 通过RDD[tuple]创建dataframe
  3. 通过RDD[JavaBean]创建dataframe
  4. 通过RDD[scala bean] 创建dataframe
  5. 通过 RDD[Row] 来创建dataframe

1.通过RDD创建dataframe

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * @description:
  *   通过RDD创建dataframe的方式1: 把rdd[T]变成 RDD[case class类型]就可以直接toDF
 */

// 1,张飞,21,北京,80.0
case class Stu(id:Int,name:String,age:Int,city:String,score:Double)

object Demo2_CreateDF_1 {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName(this.getClass.getSimpleName)
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._

    // 加载不含schema信息的数据为一个RDD
    val rdd: RDD[String] = spark.sparkContext.textFile("data/stu.txt")

    // 将rdd[String]转成rdd[Stu]
    val rddStu: RDD[Stu] = rdd
      .map(line=>line.split(","))
      .map(arr=>Stu(arr(0).toInt,arr(1),arr(2).toInt,arr(3),arr(4).toDouble))


    // 然后就可以将这种装着case class类型数据的RDD直接转成dataframe了
    val df = spark.createDataFrame(rddStu)
    df.printSchema()
    df.show()

    // 另一种更简洁的方式:需要导入隐式转换:  import spark.implicits._
    // 也是更常用的方式
    val df2: DataFrame = rddStu.toDF()

    spark.close()
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

2. 通过RDD[tuple]创建dataframe

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

/**
 * @description:
  *    通过RDD[tuple]创建dataframe
 */
object Demo3_CreateDF_2 {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)

    val spark = SparkSession
      .builder()
      .appName("demo3")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._
    val rdd: RDD[String] = spark.sparkContext.textFile("data/stu.txt")

    // 将RDD[String] 变成RDD[(f1,f2,f3,....)]
    val addTuple: RDD[(Int, String, Int, String, Double)] = rdd.map(_.split(",")).map(arr=>(arr(0).toInt,arr(1),arr(2).toInt,arr(3),arr(4).toDouble))

    // 将RDD[(f1,f2,f3,....)] 转换成dataframe即可,框架能从tuple中自动推断出结构

    // val df = addTuple.toDF()
    /**
      * root
      * |-- _1: integer (nullable = false)
      * |-- _2: string (nullable = true)
      * |-- _3: integer (nullable = false)
      * |-- _4: string (nullable = true)
      * |-- _5: double (nullable = false)
      */

    // 在转换时,可以对dataframe中的字段,传入一套自定义的字段名
    val df = addTuple.toDF("id","name","age","city","score")


    // 打印schema
    df.printSchema()
    /**
      * root
      * |-- id: integer (nullable = false)
      * |-- name: string (nullable = true)
      * |-- age: integer (nullable = false)
      * |-- city: string (nullable = true)
      * |-- score: double (nullable = false)
      */

    // 打印数据
    df.show()
    spark.close()
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

3. 通过RDD[JavaBean]创建dataframe

先创建对应的javabean用于封装数据 代码实现
/**
 * @description:  一个用于封装stu数据的javabean
 */
// 1,张飞,21,北京,80.0
public class JavaStu {
    int id;
    String name;
    int age;
    String city;
    double score;

    public JavaStu() {
    }

    public JavaStu(int id, String name, int age, String city, double score) {
        this.id = id;
        this.name = name;
        this.age = age;
        this.city = city;
        this.score = score;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    public double getScore() {
        return score;
    }

    public void setScore(double score) {
        this.score = score;
    }

    @Override
    public String toString() {
        return "JavaStu{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", age=" + age +
                ", city='" + city + '\'' +
                ", score=" + score +
                '}';
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
创建DataFrame 代码实现
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

/**
  * @description:
  * 通过RDD[JavaBean]创建dataframe
  * get
  * set
  */
object Demo4_CreateDF_3 {

  def main(args: Array[String]): Unit = {

    Logger.getLogger("org").setLevel(Level.WARN)

    val spark = SparkSession
      .builder()
      .appName("demo3")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._


    val rdd: RDD[String] = spark.sparkContext.textFile("data/stu.txt")

    // RDD[String] 变成  RDD[JavaStu]
    val rddJavaStu: RDD[JavaStu] = rdd.map(line=>{

      val arr = line.split(",")

      new JavaStu(arr(0).toInt,arr(1),arr(2).toInt,arr(3),arr(4).toDouble)

    })

    // 将 RDD[JavaStu] 变成 dataframe
    val df = spark.createDataFrame(rddJavaStu,classOf[JavaStu])

    df.printSchema()
    df.show()

    spark.close()
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

4. 通过RDD[scala bean] 创建dataframe

先创建对应的scalabean用于封装数据 代码实现
import scala.beans.BeanProperty

/**
 * @description: scala版本的bean类
 */
class ScalaStu(
  @BeanProperty
  val id:Int,

  @BeanProperty
  val name:String,

  @BeanProperty
  val age:Int,

  @BeanProperty
  val city:String,

  @BeanProperty
  val score:Double
)

object ScalaStu{

  def apply(
             id: Int,
             name: String,
             age: Int,
             city: String,
             score: Double
           ): ScalaStu = new ScalaStu(id, name, age, city, score)
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
创建DataFrame 代码实现
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * 通过RDD[scala bean] 创建dataframe
  *
  */
object Demo5_CreateDF_4 {

  def main(args: Array[String]): Unit = {

    Logger.getLogger("org").setLevel(Level.WARN)

    val spark = SparkSession
      .builder()
      .appName("demo3")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    val rdd: RDD[String] = spark.sparkContext.textFile("data/stu.txt")

    // RDD[String] 变成  RDD[ScalaStu]
    val rdd2: RDD[ScalaStu] = rdd.map(line=>{
      val arr = line.split(",")
      val bean = ScalaStu(arr(0).toInt,arr(1),arr(2).toInt,arr(3),arr(4).toDouble)

      bean
    })

    // 用RDD[ScalaStu]创建dataframe
    val df: DataFrame = spark.createDataFrame(rdd2,classOf[ScalaStu])

    df.printSchema()
    df.show()

    spark.close()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

5. 通过 RDD[Row] 来创建dataframe

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
 * @description:
  *    通过 RDD[Row] 来创建dataframe
  *    Row是sparksql中对数据的统一封装形式(内置定义好的一个数据封装类)
  *    任何数据要变成dataframe,都得先把数据变成Row
 */
object Demo6_CreateDF_5 {

  def main(args: Array[String]): Unit = {
  
    Logger.getLogger("org").setLevel(Level.WARN)

    val spark = SparkSession
      .builder()
      .appName("demo3")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._


    val rdd: RDD[String] = spark.sparkContext.textFile("data/stu.txt")


    // RDD[String]  变成 RDD[Row]
    val rdd2: RDD[Row] = rdd.map(line=>{
      val arr = line.split(",")
      Row(arr(0).toInt,arr(1),arr(2).toInt,arr(3),arr(4).toDouble)
    })

    //  构造一个数据描述(表定义)
    val schema = new StructType(Array(
      new StructField("id",DataTypes.IntegerType),
      new StructField("name",DataTypes.StringType),
      new StructField("age",DataTypes.IntegerType),
      new StructField("city",DataTypes.StringType),
      new StructField("score",DataTypes.DoubleType)
    ))

    // 用RDD[Row] + 数据描述  ==》 构造一个dataframe
    val df = spark.createDataFrame(rdd2,schema)

    df.printSchema()
    df.show()

    // df.where("socre>85").select("id","name")

    spark.close()
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

6. SetSeqMap 类型数据rdd的编解码

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object Demo7_CreateDF_SetSeqMap {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("").master("local[*]").getOrCreate()

    val seq1 = Seq(1,2,3,4)
    val seq2 = Seq(11,22,33,44)

    val rdd: RDD[Seq[Int]] = spark.sparkContext.parallelize(List(seq1,seq2))

    import spark.implicits._
    val df = rdd.toDF()

    df.printSchema()
    df.show()

    df.selectExpr("value[0]","size(value)").show()


    /**
      * set类型数据rdd的编解码
      */
    val set1 = Set("a","b")
    val set2 = Set("c","d","e")
    val rdd2: RDD[Set[String]] = spark.sparkContext.parallelize(List(set1,set2))

    val df2 = rdd2.toDF("members")
    df2.printSchema()
    df2.show()


    /**
      * map类型数据rdd的编解码
      */

    val map1 = Map("father"->"mayun","mother"->"tangyan")
    val map2 = Map("father"->"huateng","mother"->"yifei","brother"->"sicong")
    val rdd3: RDD[Map[String, String]] = spark.sparkContext.parallelize(List(map1,map2))

    val df3 = rdd3.toDF("jiaren")
    df3.printSchema()
    df3.show()

    df3.selectExpr("jiaren['mother']","size(jiaren)","map_keys(jiaren)","map_values(jiaren)")
        .show(10,false)

    spark.close()

  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/693321
推荐阅读
相关标签
  

闽ICP备14008679号