赞
踩
如有侵权请联系,交流QQ:2499496272
在Spark SQL中SparkSession是创建DataFrames和执行SQL的入口创建DataFrames有三种方式:
(1) 从一个已存在的RDD进行转换
(2) 从JSON/Parquet/CSV/JDBC等结构化数据源进行创建
(3) 从Hive Table进行查询返回
核心要义:创建DataFrame,需要创建 “RDD + 元信息schema定义”rdd来自于数据schema则可以由开发人员定义,或者由框架从数据中推断
package day01 import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} /** * 通过RDD创建dataframe的方式1: 把rdd[T]变成 RDD[case class类型]就可以直接toDF */ case class Stu(id:Int,name:String,age:Int,city:String,score:Double) object Demo2_CreateDF_1 { Logger.getLogger("org").setLevel(Level.WARN) def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() // 加载不含schema信息的数据为一个RDD val rdd: RDD[String] = spark.sparkContext.textFile("data/doit_stu.csv") import spark.implicits._ // 将rdd[String]转成rdd[Stu] val rddStu: RDD[Stu] = rdd .map(line => line.split(",")) .map(arr => Stu(arr(0).toInt, arr(1), arr(2).toInt, arr(3), arr(4).toDouble)) // 然后就可以将这种装着case class类型数据的RDD直接转成dataframe了 val res: DataFrame = spark.createDataFrame(rddStu) res.printSchema() res.show(50,false) // 另一种更简洁的方式:需要导入隐式转换: import spark.implicits._ // 也是更常用的方式 val df2: DataFrame = rddStu.toDF() spark.close() } }
package day01 import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** * 通过RDD[tuple]创建dataframe */ object Demo3_CreateDF_2 { Logger.getLogger("org").setLevel(Level.WARN) def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession .builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() import spark.implicits._ val rdd: RDD[String] = spark.sparkContext.textFile("data/doit_stu.csv") // 将RDD[String] 变成RDD[(f1,f2,f3,....)] val addTuple: RDD[(Int, String, Int, String, Double)] = rdd.map(_.split(",")).map(arr => (arr(0).toInt, arr(1), arr(2).toInt, arr(3), arr(4).toDouble)) // 将RDD[(f1,f2,f3,....)] 转换成dataframe即可,框架能从tuple中自动推断出结构 // val df = addTuple.toDF() /** * root * |-- _1: integer (nullable = false) * |-- _2: string (nullable = true) * |-- _3: integer (nullable = false) * |-- _4: string (nullable = true) * |-- _5: double (nullable = false) */ val df: DataFrame = addTuple.toDF("id", "name", "age", "city", "score") df.printSchema() df.show(50, false) spark.close() } }
package day01 import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** * 通过RDD[JavaBean]创建dataframe * 什么是javabean? * 就是一种类: * 类型 字段1 * 类型 字段2 * get * set */ object Demo4_CreateDF_3 { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val spark = SparkSession .builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() import spark.implicits._ val rdd: RDD[String] = spark.sparkContext.textFile("data/doit_stu.csv") // RDD[String] 变成 RDD[JavaStu] val rddJavaStu: RDD[JavaStu] = rdd.map(line => { val arr = line.split(",") new JavaStu(arr(0).toInt, arr(1), arr(2).toInt, arr(3), arr(4).toDouble) }) // 将 RDD[JavaStu] 变成 dataframe val df = spark.createDataFrame(rddJavaStu, classOf[JavaStu]) df.printSchema() df.show() spark.close() } } ------------------------------ package day01; public class JavaStu { int id; String name; int age; String city; double score; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } public double getScore() { return score; } public void setScore(double score) { this.score = score; } public JavaStu(int id, String name, int age, String city, double score) { this.id = id; this.name = name; this.age = age; this.city = city; this.score = score; } public JavaStu() { } @Override public String toString() { return "JavaStu{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + ", city='" + city + '\'' + ", score=" + score + '}'; } }
注:此处的普通类指的是scala中定义的非case class的类框架在底层将其视作java定义的标准bean类型来处理而scala中定义的普通bean类,不具备字段的java标准getters和setters,因而会处理失败可以如下处理来解决
package day01 import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** * 通过RDD[scala bean] 创建dataframe */ object Demo5_CreateDF_4 { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val spark = SparkSession .builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() import spark.implicits._ val rdd: RDD[String] = spark.sparkContext.textFile("data/doit_stu.csv") val rddScalaStu: RDD[ScalaStu] = rdd.map( line => { val arr: Array[String] = line.split(",") new ScalaStu(arr(0).toInt, arr(1), arr(2).toInt, arr(3), arr(4).toDouble) } ) val df: DataFrame = spark.createDataFrame(rddScalaStu,classOf[ScalaStu]) df.printSchema() df.show(50,false) spark.close() } } ---------------- package day01 import scala.beans.BeanProperty class ScalaStu ( @BeanProperty val id : Int , @BeanProperty val name : String , @BeanProperty val age : Int , @BeanProperty val city : String, @BeanProperty val score : Double ) object ScalaStu{ def apply(id: Int, name: String, age: Int, city: String, score: Double): ScalaStu = new ScalaStu(id, name, age, city, score) }
注:DataFrame中的数据,本质上还是封装在RDD中,而RDD[ T ]总有一个T类型,DataFrame内部的RDD中的元素类型T即为框架所定义的Row类型;
package day01 import org.apache.log4j.{Level, Logger} import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} /** * 通过 RDD[Row] 来创建dataframe * Row是sparksql中对数据的统一封装形式(内置定义好的一个数据封装类) * 任何数据要变成dataframe,都得先把数据变成Row */ object Demo6_CreateDF_5 { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) val spark = SparkSession .builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() import spark.implicits._ val rdd: RDD[String] = spark.sparkContext.textFile("data/doit_stu.csv") // RDD[String] 变成 RDD[Row] val rdd2: RDD[Row] = rdd .map(line => { val arr: Array[String] = line.split(",") Row(arr(0).toInt, arr(1), arr(2).toInt, arr(3), arr(4).toDouble) }) //构造一个数据描述(表定义) val schema: StructType = new StructType(Array( new StructField("id", DataTypes.IntegerType), new StructField("name", DataTypes.StringType), new StructField("age", DataTypes.IntegerType), new StructField("city", DataTypes.StringType), new StructField("score", DataTypes.DoubleType) )) // 用RDD[Row] + 数据描述 ==》 构造一个dataframe val df: DataFrame = spark.createDataFrame(rdd2,schema) df.printSchema() df.show() spark.close() } }
import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} object Demo7_CreateDF_SetSeqMap { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkUtil.getSpark() val seq1: Seq[Int] = Seq(1,2,3,4) val seq2 = Seq(11,22,33,44) /** * seq类型数据rdd的编解码 */ val rdd: RDD[Seq[Int]] = spark.sparkContext.parallelize(List(seq1,seq2)) import spark.implicits._ val df = rdd.toDF() df.printSchema() df.show() df.selectExpr("value[0]","size(value)").show() /** * root * |-- value: array (nullable = true) * | |-- element: integer (containsNull = false) * * * +--------+-----------+ * |value[0]|size(value)| * +--------+-----------+ * | 1| 4| * | 11| 4| * +--------+-----------+ */ /** * set类型数据rdd的编解码 */ val set1: Set[String] = Set("a","b") val set2 = Set("c","d","e") val rdd1: RDD[Set[String]] = spark.sparkContext.parallelize(List(set1,set2)) val df2 = rdd1.toDF() df2.printSchema() df2.show() /** * map类型数据rdd的编解码 */ val map1 = Map("father"->"mayun","mother"->"tangyan") val map2 = Map("father"->"huateng","mother"->"yifei","brother"->"sicong") val rdd3: RDD[Map[String, String]] = spark.sparkContext.parallelize(List(map1,map2)) val df3: DataFrame = rdd3.toDF("jiaren") df3.printSchema() /** * root * |-- jiaren: map (nullable = true) * | |-- key: string * | |-- value: string (valueContainsNull = true) */ df3.show() /*** * +--------------------+ * | jiaren| * +--------------------+ * |[father -> mayun,...| * |[father -> huaten...| * +--------------------+ */ df3.selectExpr("jiaren['mother']","size(jiaren)","map_keys(jiaren)","map_values(jiaren)").show(10,false) /** * +--------------+------------+-------------------------+------------------------+ * |jiaren[mother]|size(jiaren)|map_keys(jiaren) |map_values(jiaren) | * +--------------+------------+-------------------------+------------------------+ * |tangyan |2 |[father, mother] |[mayun, tangyan] | * |yifei |3 |[father, mother, brother]|[huateng, yifei, sicong]| * +--------------+------------+-------------------------+------------------------+ */ spark.close() } } ------- import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession object SparkUtil { def getSpark(app:String = "demo", master : String = "local[*]") : SparkSession = { Logger.getLogger("org").setLevel(Level.WARN) val spark: SparkSession = SparkSession .builder() .appName(app) .master(master) .getOrCreate() spark } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。