赞
踩
scala spark 创建DataFrame的多种方式
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.rdd.RDD
- import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
- import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-
- /**
- *通过RDD[Row]和StructType创建DataFrame
- */
- object DataFrameDemo {
- def main(args: Array[String]): Unit = {
- Logger.getLogger("org").setLevel(Level.OFF)
- // 通过SparkSession创建spark入口
- val spark: SparkSession = SparkSession.builder()
- .appName(this.getClass.getSimpleName)
- .master("local[*]")
- .getOrCreate()
- println("1. 通过RDD[Row]和StructType创建")
- //1. 通过RDD[Row]和StructType创建
- val sparkRdd1: RDD[String] = spark.sparkContext.parallelize(List("X,22,M", "y,21,W", "N,22,M"))
- // 将RDD与Row联合
- val rowRdd: RDD[Row] = sparkRdd1.map(t => {
- val per: Array[String] = t.split(",")
- Row(per(0), per(1).toInt, per(2))
- })
- // 创建StructType实例,设置字段名和类型
- val schema: StructType = StructType(
- List(
- StructField("name", StringType),
- StructField("age", IntegerType),
- StructField("sex", StringType)
- )
- )
- // 创建dataFrame
- val dataFrame: DataFrame = spark.createDataFrame(rowRdd, schema)
- // 展示数据
- dataFrame.show()
- // 释放资源
- spark.stop()
- }
- }

输出结果
- 1. 通过RDD[Row]和StructType创建
- +----+---+---+
- |name|age|sex|
- +----+---+---+
- | X| 22| M|
- | y| 21| W|
- | N| 22| M|
- +----+---+---+
- import org.apache.spark.rdd.RDD
- import org.apache.spark.sql.{DataFrame, SparkSession}
-
- /**
- * 通过RDD.toDF直接创建DataFrame
- */
- object DataFrameDemo2 {
- def main(args: Array[String]): Unit = {
- Logger.getLogger("org").setLevel(Level.OFF)
- // 通过SparkSession创建spark入口
- val spark: SparkSession = SparkSession.builder()
- .appName(this.getClass.getSimpleName)
- .master("local[*]")
- .getOrCreate()
- println("2. 直接通过Rdd.toDF()创建DataFrame")
- // 创建RDD
- val sparkRdd3: RDD[String] = spark.sparkContext.parallelize(List("X,22,M", "y,21,W", "N,22,M"))
- // 将RDD与Row联合
- val toDFRdd: RDD[(String, Int, String)] = sparkRdd3.map(t => {
- val per: Array[String] = t.split(",")
- (per(0), per(1).toInt, per(2))
- })
- import org.apache.spark.sql.functions._
- import spark.implicits._
- // 创建DataFrame,设置schema字段名
- //写法1
- //val frame: DataFrame = toDFRdd.toDF("name", "age", "sex")
- //写法2
- val array =Array("name", "age", "sex")
- val frame: DataFrame = toDFRdd.toDF(array: _*)
-
- // 展示数据
- frame.agg(sum("age") as "avg_age").show()
- // 释放资源
- spark.stop()
- }
-
- }

输出结果
- 2. 直接通过Rdd.toDF()创建DataFrame
- +-------+
- |avg_age|
- +-------+
- | 65|
- +-------+
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.rdd.RDD
- import org.apache.spark.sql.{DataFrame, SparkSession}
-
- /**
- * 通过RDD和scalabean创建DataFrame
- */
- object DataFrameDemo3 {
- def main(args: Array[String]): Unit = {
- Logger.getLogger("org").setLevel(Level.OFF)
- // 通过SparkSession创建spark入口
- val spark: SparkSession = SparkSession.builder()
- .appName(this.getClass.getSimpleName)
- .master("local[*]")
- .getOrCreate()
- println("3. 通过RDD和ScalaBean创建DataFrame")
- val sparkRdd: RDD[String] = spark.sparkContext.parallelize(List("X,22,M", "y,21,W", "N,22,M"))
- // 关联SacalaBean的RDD
- val beanRdd: RDD[Per] = sparkRdd.map(t => {
- val per: Array[String] = t.split(",")
- Per(per(0), per(1).toInt, per(2))
- })
- // 必须导入隐式转换才能使用.toDF
- import spark.implicits._
- // 创建dataFrame
- val df: DataFrame = beanRdd.toDF
- // 创建视图
- df.createTempView("t_per")
- // 查询数据
- val res: DataFrame = spark.sql("SELECT name,age FROM t_per ORDER BY age")
- // 展示数据
- res.show()
- // 释放资源
- spark.stop()
- }
-
- //ScalaBean
- case class Per(name: String, age: Int, sex: String)
- }

输出结果
- 3. 通过RDD和ScalaBean创建DataFrame
- +----+---+
- |name|age|
- +----+---+
- | y| 21|
- | N| 22|
- | X| 22|
- +----+---+
- import org.apache.spark.rdd.RDD
- import org.apache.spark.sql.{DataFrame, SparkSession}
-
- /**
- * 通过RDD和javabean创建DataFrame
- */
- object DataFrameDemo4 {
- def main(args: Array[String]): Unit = {
- Logger.getLogger("org").setLevel(Level.OFF)
- // 通过SparkSession创建spark入口
- val spark: SparkSession = SparkSession.builder()
- .appName(this.getClass.getSimpleName)
- .master("local[*]")
- .getOrCreate()
- println("4. 通过RDD联合JavaBean创建DataFrame")
- // 创建RDD
- val sparkRdd4: RDD[String] = spark.sparkContext.parallelize(List("X,22,M", "y,21,W", "N,22,M"))
- // 将RDD与Row联合
- val javaBeanRdd: RDD[Person] = sparkRdd4.map(t => {
- val per: Array[String] = t.split(",")
- new Person(per(0), per(1).toInt, per(2))
- })
- // 创建DataFrame
- val frame1: DataFrame = spark.createDataFrame(javaBeanRdd, classOf[Person])
- // 展示数据
- frame1.show()
- // 释放资源
- spark.stop()
- }
- }

Java bean class
- public class Person {
- private String name;
- private int age;
- private String sex;
-
- public Person(String name, int age, String sex) {
- this.name = name;
- this.age = age;
- this.sex = sex;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public int getAge() {
- return age;
- }
-
- public void setAge(int age) {
- this.age = age;
- }
-
- public String getSex() {
- return sex;
- }
-
- public void setSex(String sex) {
- this.sex = sex;
- }
- }

输出结果
- 4. 通过RDD联合JavaBean创建DataFrame
- +---+----+---+
- |age|name|sex|
- +---+----+---+
- | 22| X| M|
- | 21| y| W|
- | 22| N| M|
- +---+----+---+
- import org.apache.spark.rdd.RDD
- import org.apache.spark.sql.{DataFrame, SparkSession}
-
- /**
- * 通过RDD和javabean创建DataFrame
- */
- object DataFrameDemo5 {
- def main(args: Array[String]): Unit = {
- Logger.getLogger("org").setLevel(Level.OFF)
- // 通过SparkSession创建spark入口
- val spark: SparkSession = SparkSession.builder()
- .appName(this.getClass.getSimpleName)
- .master("local[*]")
- .getOrCreate()
- println("5、手写的示例5")
- val rdd11 = spark.sparkContext.parallelize(Seq(
- Row(8, "bat"),
- Row(64, "mouse"),
- Row(-27, "horse")
- ))
- rdd11.foreach(println)
- // 创建
- val newRDD: RDD[NumberWord]= rdd11.map(x=>{
- val rdto = convertNumberWordDTO(x.getInt(0)+100,x.getString(1))
- rdto
- })
- val ds = spark.createDataFrame(newRDD, classOf[NumberWord])
- ds.show(100,false)
-
- println(ds.schema) //StructType(StructField(number,IntegerType,false), StructField(word,StringType,true))
- //调用 someDF.schema.printTreeString()得到someDF的schema:
- ds.schema.printTreeString()
- //root
- // |-- number: integer (nullable = false)
- // |-- word: string (nullable = true)
-
- // 释放资源
- spark.stop()
- }
- def convertNumberWordDTO(number: Int,word:String) ={
- val rdto = new NumberWord()
- rdto.setNumber(number)
- rdto.setWord(word)
- rdto
- }
- }

Java bean class
- public class NumberWord {
- private int number;
- private String word;
-
- public NumberWord() {
- }
-
- public NumberWord(int number, String word) {
- this.number = number;
- this.word = word;
- }
-
- public int getNumber() {
- return number;
- }
-
- public void setNumber(int number) {
- this.number = number;
- }
-
- public String getWord() {
- return word;
- }
-
- public void setWord(String word) {
- this.word = word;
- }
- }

输出结果
- 5、手写的示例5
- [8,bat]
- [64,mouse]
- [-27,horse]
- +------+-----+
- |number|word |
- +------+-----+
- |108 |bat |
- |164 |mouse|
- |73 |horse|
- +------+-----+
-
- StructType(StructField(number,IntegerType,false), StructField(word,StringType,true))
- root
- |-- number: integer (nullable = false)
- |-- word: string (nullable = true)

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。