当前位置:   article > 正文

Spark SQL中的DataFrame_spark sql dataframe

spark sql dataframe

SparkCore基于RDD来计算,Spark SQL基于DataFrame来计算;
Dataframe,分布式的大表,一个分布式数据容器;包含列的schema(名称,属性);底层封装了RDD,泛型ROW类型;dataframe基于列式存储,每一列就是一个对象;好处:只需要读取需要的列的数据到内存,而不需要将所有列加载到内存,减少数据的冗余;
Spark SQL是Spark中的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象,就是DataFrame。同时Spark SQL还可以作为分布式的SQL查询引擎。 Spark SQL最重要的功能之一,就是从Hive中查询数据。
DataFrame,可以理解为是,以列的形式组织的,分布式的数据集合。它其实和关系型数据库中的表非常类似,但是底层做了很多的优化。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD。

1、创建DataFrame

在Spark SQL中SparkSession是创建DataFrames和执行SQL的入口。在Spark SQL中创建DataFrames有如下几种方式:

(1)从集合或者RDD中通过隐式转换创建;
(2)借助SparkSession对象的createDataFrame函数从集合或者RDD中创建;
(3)从Spark SQL支持的数据源创建(文件、关系型数据库等);
(4)从Hive数据仓库创建。

1.1 借助隐式转换从集合或者RDD创建DataFrame

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 1、通过隐式转换函数toDF()从Scala集合或者是RDD中创建DataFrame
 */
object DataFrameCreateDemo {
  def main(args: Array[String]): Unit = {
    val sparkConf:SparkConf = new SparkConf().setMaster("local[2]").setAppName("dataFrameCreate")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    /**
     * 方式一:Scala集合创建DataFrame 集合最好是一个元组类型的数据
     * 集合中有一个方法toDF(字段名列表)
     * 但是集合中的toDF函数不存在,需要通过隐式转换来增强集合的执行方式
     * 引入转换函数是我们创建sparkSession对象中的,不能在上面导,而且导入时必须用创建的对象去引入,不能用SparkSession类引入
     */
    import sparkSession.implicits._
    // 左边为编译时类型,必须为Seq;右边为运行时类型,可以为Array、List
    val array:Seq[(String,Int,String)] = Array(("cy",32,"男"),("hr",23,"女"),("sq",32,"男"),("ly",35,"女"))
    val dataFrame:DataFrame = array.toDF("name", "age", "sex")
    dataFrame.show()

    /**
     * 方式二:通过RDD的toDF函数来创建DataFrame
     * RDD的创建需要依赖SparkContext, SparkContext不需要创建了,因为SparkSession中封装了一个SparkContext
     */
    //导入SparkSession中定义的隐式转换函数
    import sparkSession.implicits._
    val sparkContext = sparkSession.sparkContext
    val rdd:RDD[(String,Int,String)] = sparkContext.makeRDD(Array(("cy",32,"男"),("hr",23,"女"),("sq",32,"男"),("ly",35,"女")))
    val dataFrame1:DataFrame = rdd.toDF("name", "age", "sex")
    dataFrame1.show()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

1.2 借助SparkSession对象的createDataFrame函数从集合或者RDD中创建

在SparkSession类中提供了一系列的createDataFrame重载方法,可以实现传入集合或者RDD实现DataFrame的创建
createDataFrame函数从集合中创建DataFrame:

函数函数解释
createDataFrame (data: Seq[A]): DataFrame从一个Scala的Seq集合中创建DataFrame,DataFrame的列名自动生成
createDataFrame(data: java.util.List[], beanClass: Class[]): DataFrame从一个Java的List集合以及JavaBean中创建DataFrame,列名为JavaBean的属性
createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame从一个Java的List[Row]集合以及StructType表结构体中创建DataFrame,列名从设置的StructType结构中获取

createDataFrame函数从RDD中创建DataFrame:

函数函数解释
createDataFrame (rdd: RDD[A]): DataFrame从一个RDD中创建DataFrame
createDataFrame(rdd: RDD[], beanClass: Class[]): DataFrame从一个RDD中以及JavaBean中创建DataFrame
createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame从一个RDD[Row]集合以及StructType表结构体中创建DataFrame
/**
 * 2、从集合或者RDD中创建DataFrame的第二种方式 使用SparkSession自带的createDataFrame重载函数完成
 */

import com.sun.prism.PixelFormat.DataType
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

import scala.beans.BeanProperty

object DataFrameCreateDemo01 {
  def main(args: Array[String]): Unit = {
    val sparkConf:SparkConf = new SparkConf().setMaster("local[2]").setAppName("dataFrameCreate")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    println("==============createDataFrameByList===============")
    createDataFrameByList(sparkSession)
    println("==============createDataFrameByRDD===============")
    createDataFrameByRDD(sparkSession)
  }

   /*
      1、从集合中借助createDataFrame函数创建DataFrame
         createDataFrame(Seq[T])     // 列名会自动生成
         createDataFrame(util.List[T],beanClass)    // 通过反射机制将beanClass中属性名当作List集合元组的列名    ——不常用
         createDataFrame(List[Row],StructType)
   */
  def createDataFrameByList(sparkSession: SparkSession): Unit = {

    val dataFrame:DataFrame = sparkSession.createDataFrame(Array(("cy", 32, "男"), ("hr", 23, "女"), ("sq", 32, "男"), ("ly", 35, "女")))
    dataFrame.show()

    /**
     * 通过集合和反射类创建DataFrame 集合中放的元素最好是样例类元素
     */
    val list:java.util.List[Student] = new java.util.ArrayList[Student]()
    list.add(Student("cy", 32, "男"))
    list.add(Student("hr", 23, "女"))
    list.add(Student("sq", 32, "男"))
    list.add(Student("ly", 35, "女"))
    val dataFrame1:DataFrame = sparkSession.createDataFrame(list, classOf[Student])
    dataFrame1.show()

    val list1:java.util.List[Row] = new java.util.ArrayList[Row]()
    list1.add(Row("cy", 32, "男"))
    list1.add(Row("hr", 23, "女"))
    list1.add(Row("sq", 32, "男"))
    list1.add(Row("ly", 35, "女"))
    val structType:StructType = StructType(Array(
      StructField("name", DataTypes.StringType),
      StructField("age", DataTypes.IntegerType),
      StructField("sex", DataTypes.StringType)
    ))
    val dataFrame2:DataFrame = sparkSession.createDataFrame(list1, structType)
    dataFrame2.show()
  }

  /**
   * 2、从RDD中借助createDataFrame函数创建DataFrame
   * createDataFrame(RDD[T])   ——创建的DF列名是随机生成的
   * createDataFrame(RDD[T], beanClass)   ——借助反射创建DF
   * createDataFrame(RDD[Row], StructType)   ——根据数据和schema创建DataFrame
   */
  def createDataFrameByRDD(sparkSession: SparkSession): Unit ={

    // createDataFrame(RDD[T])   ——直接使用RDD创建DataFrame列名是随机生成的
//    val sparkContext = sparkSession.sparkContext
    val rdd:RDD[(String,Int,String)] = sparkSession.sparkContext.makeRDD(Array(("cy", 32, "男"), ("hr", 23, "女"), ("sq", 32, "男"), ("ly", 35, "女")))
    val dataFrame:DataFrame = sparkSession.createDataFrame(rdd)
    dataFrame.show()

    // createDataFrame(RDD[T], beanClass)   ——借助反射创建DF  借助RDD和样例类完成DataFrame的创建,列名会使用样例类的类名
    /**
     * 限制:理论上,样例类前面的RDD是什么类型都无所谓,不报错可以运行,但是结果不对
     *     如果要使用样例类,RDD类型最好是样例类的类型,还得需要配置一些东西(第二个参数)
     *     样例类的反射机制去创建DataFrame,第二个参数传递的是一个Class对象。但是一定要注意
     *     传递的是一个Class对象的实例--样例类的Class实例对象。实例是这样获取的classOf[Student]
     *     千万不能使用Student.getClass去获取
     *
     * 【注意】scala中getClass和classOf的区别
     *     getClass获取回来的是Class这个类----String
     *     classOf获取的是Class的实例对象-----"zs"
     */
    val rdd1:RDD[(String,Int,String)] = sparkSession.sparkContext.makeRDD(Array(("cy", 32, "男"), ("hr", 23, "女"), ("sq", 32, "男"), ("ly", 35, "女")))
    val rdd4:RDD[Student] = rdd1.map((tuple3: (String, Int, String)) => {
      Student(tuple3._1, tuple3._2, tuple3._3)
    })
    val dataFrame1:DataFrame = sparkSession.createDataFrame(rdd4, classOf[Student])
    dataFrame1.show()

    // createDataFrame(RDD[Row], StructType)   ——根据数据和schema创建DataFrame  借助RDD[Row]和StructType创建DataFrame
    val rdd2:RDD[(String,Int,String)] = sparkSession.sparkContext.makeRDD(Array(("cy", 32, "男"), ("hr", 23, "女"), ("sq", 32, "男"), ("ly", 35, "女")))
    val rdd3:RDD[Row] = rdd2.map((tuple3: (String, Int, String)) => {
      Row(tuple3._1, tuple3._2, tuple3._3)
    })
    val structType:StructType = StructType(Array(
      StructField("name", DataTypes.StringType),
      StructField("age", DataTypes.IntegerType),
      StructField("sex", DataTypes.StringType)
    ))
    val dataFrame2:DataFrame = sparkSession.createDataFrame(rdd3,structType)
    dataFrame2.show()
  }
}

/**
 * case class是一个样例类,类似于Java中的JavaBean对象
 * Scala中样例类会自动生成一个伴生对象,生成伴生对象的apply方法,同时还会生成toString、hashCode、equals方法
 */
case class Student(@BeanProperty name:String, @BeanProperty age:Int, @BeanProperty sex:String)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111

1.3 从Spark SQL支持的数据源进行创建

Spark SQL支持从txt,csv,json,parquet等格式的外部结构化数据文件、以及关系型数据库等数据源读取数据创建DataFrame。一般情况下,我们需要将结构化数据文件存放到HDFS分布式文件存储系统。Spark SQL的SparkSession对象可以通过sparksession.read.format(指定的格式).load(文件的路径)或者sparksession.read.option(key,value).格式的名称(文件的路径)的方式从数据源中创建数据。一般常使用sparksession.read.option(key,value).格式的名称(文件的路径)方式创建。如spark.read .option(“header”,value=true) .option(“inferSchema”,value = true) .csv(“xxx.csv”)。
Option可选操作项
.option(“mode”, “FAILFAST”) // 读取模式
读取模式的常用值有
permissive 当遇到损坏的记录时,将其所有字段设置为 null,
dropMalformed 删除格式不正确的行
failFast 遇到格式不正确的数据时立即失败
.option(“inferSchema”, “true”) // 是否自动推断 schema
.option(“path”, “path/to/file(s)”) // 文件路径
.option(“header”, “false”) // 文件中的第一行是否为列的名称 CSV文件使用。

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

object FileCreateDataFrame {
  def main(args: Array[String]): Unit = {
    val sparkConf:SparkConf = new SparkConf().setMaster("local[2]").setAppName("FileCreateDataFrame")
    val sparkSession:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    /**
     * 1、从txt文件中创建DataFrame
     * 如果是从普通的文本文件创建DataFrame,文件中的列和列的分隔符我们不清楚,所以创建的DataFrame只有一列,一列就是一行数据
     * inferSchema:是否自动推断 schema
     */
    val map = Map[String,String]("mode"->"FAILFAST","inferSchema"->"true")
    val dataFrame:DataFrame = sparkSession.read.options(map).text("hdfs://node1:9000/student.txt")
    dataFrame.show()

    val dataFrame4 = sparkSession.read.options(map).format("text").text("hdfs://node1:9000/student.txt")
    println("使用第二种方式读取结构化文件")
    dataFrame4.show()

    /**
     * 2、从json格式文件中创建DataFrame---列名就是json对象的key值
     * {“name”:"zs","age":20,"sex":"男"}
     * spark sql读取json文件是有要求的,json文件只能存在json对象,每一个json对象之间以换行符分隔
     */
    val map1 = Map[String,String]("mode"->"dropMalformed","inferSchema"->"true")
    val dataFrame1 = sparkSession.read.options(map1).json("hdfs://node1:9000/student.json")
    dataFrame1.show()

    /**
     * 3、从csv格式文件创建DataFrame----大数据中很多数据都是以csv文件格式存储的。
     *    csv文件--是以,分隔一种文件格式,可以使用execl或者记事本打开的一种特殊的结构化数据文件
     *
     *    "header"->"true"  把第一列当作表格的列名来处理
     */
    val map2 = Map[String,String]("mode"->"dropMalformed","inferSchema"->"true","header"->"true")
    val dataFrame2 = sparkSession.read.options(map2).csv("hdfs://node1:9000/student.csv")
    dataFrame2.show()
    //spark sql结果输出方式
//    dataFrame2.write.mode("append").parquet("hdfs://node1:9000/parquet")

    /**
     * 4、从parquet格式文件创建DataFrame----也是我们常用的一种方式
     *    parquet格式文件Hive、Spark SQL、Flink都是支持的。列式存储文件格式  支持压缩 通过文件中还包含schema结构数据
     */
    val dataFrame3 = sparkSession.read.options(map1).parquet("hdfs://node1:9000/parquet/part-00001-949c5698-2d02-4b69-9e46-b37238590ae4-c000.snappy.parquet")
    dataFrame3.show()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50

1.4 从关系型数据库创建DataFrame

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

import java.util.Properties

object MySQLToDataFrame {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("MySQLToDataFrame")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    val properties = new Properties();
    properties.put("driver","com.mysql.cj.jdbc.Driver")
    properties.put("user","用户名")
    properties.put("password","密码")
    val dataFrame = sparkSession.read.jdbc("jdbc:mysql://node1:3306/project?serverTimezone=UTC","area_pvs",properties)
    dataFrame.show()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

1.5 通过Spark SQL on Hive 读取Hive中的数据创建

原理:spark通过sql操作hive(元数据存储在mysql,真实数据存储在HDFS),spark需要连接metastore(数据存储服务)来获取元数据,然后对真实数据进行操作;
Apache Hive是Hadoop上的SQL引擎,Spark SQL编译时可以包含Hive支持,也可以不包含。包含Hive支持的Spark SQL可以支持Hive表访问、UDF(用户自定义函数)以及 Hive 查询语言(HiveQL/HQL)等。需要强调的 一点是,如果要在Spark SQL中包含Hive的库,并不需要事先安装Hive。一般来说,最好还是在编译Spark SQL时引入Hive支持,这样就可以使用这些特性了。如果你下载的是二进制版本的 Spark,它应该已经在编译时添加了 Hive 支持。
若要把Spark SQL连接到一个部署好的Hive上,你必须把hive-site.xml复制到 Spark的配置文件目录中($SPARK_HOME/conf)。即使没有部署好Hive,Spark SQL也可以运行。 需要注意的是,如果你没有部署好Hive,Spark SQL会在当前的工作目录中创建出自己的Hive 元数据仓库,叫作 metastore_db。此外,如果你尝试使用 HiveQL 中的 CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在你默认的文件系统中的 /user/hive/warehouse 目录中(如果你的 classpath 中有配好的 hdfs-site.xml,默认的文件系统就是 HDFS,否则就是本地文件系统)。

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

object HiveTableToDataFrame {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("demo")
    // 默认在本地创建,修改为HDFS上
    sparkConf.set("spark.sql.warehouse.dir", "hdfs://node1:9000/user/hive/warehouse")
    /**
     * 要开启Hive的支持 连接Hive操作Hive
     */
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()

    // 查询Hive数据表构建DataFrame
    //    val dataFrame:DataFrame = sparkSession.sql("select * from project.area_pvs")
    val dataFrame1: DataFrame = sparkSession.sql("show databases")
    val dataFrame2: DataFrame = sparkSession.sql("create database test")
    //    dataFrame.show()
    dataFrame1.show()
    dataFrame2.show()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

2、DataFrame操作数据的方式

2.1 DSL风格语法

DSL操作方式—通过DataFrame自带的一些算子(和SQL很像)来操作分析数据

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * DataFrame操作数据的第一种方式:DSL操作方式
 */
object DSLOper {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("DSLOper")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    val seq:Seq[Student] = Array(Student("cy",32,"男"),Student("hr",23,"女"),Student("sq",32,"男"))
    val rdd:RDD[Student] = sparkSession.sparkContext.makeRDD(seq)
    val dataFrame:DataFrame = sparkSession.createDataFrame(rdd, classOf[Student])

    // 打印DataFrame的结构structure
    dataFrame.printSchema()

    // 求不同性别的人数
    val dataset = dataFrame.groupBy("sex")
    val dataFrame1:DataFrame = dataset.count()
    dataFrame1.show()

//    val dataFrame2 = dataFrame.groupBy("sex").count().select("sex", "count")
//    val dataFrame2 = dataFrame.groupBy("sex").count().select("sex", "count").limit(1)
    val dataFrame2 = dataFrame.groupBy("sex").agg(Map("sex"->"count")).select("*")
    dataFrame2.show()

    // 求DataFrame中年龄最大 年龄最小 总人数 平均年龄
    dataFrame.groupBy("name", "age", "sex").agg(Map("age"->"max","age"->"min","*"->"count","age"->"avg")).select("*").show()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

2.2 SQL风格语法

SQL操作方式—通过SQL语法或者HQL语句来操作处理分析数据

/**
 * DataFrame操作数据的第二种方式:SQL操作方式
 */

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.beans.BeanProperty

object SQLOper {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("SQLOper")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    val seq:Seq[Student] = Array(Student("cy",32,"男"),Student("hr",23,"女"),Student("sq",32,"男"))
    val rdd:RDD[Student] = sparkSession.sparkContext.makeRDD(seq)
    val dataFrame:DataFrame = sparkSession.createDataFrame(rdd, classOf[Student])

    // 将DataFrame注册成为临时表
    dataFrame.createOrReplaceTempView("student")

    // 创建的是全局表的时候需要在查询语句中的表名前必须加global_temp
//    dataFrame.createOrReplaceGlobalTempView("student")
    // 通过SQL查询临时表数据——查询不同性别的人数
    val dataFrame1 = sparkSession.sql("select sex, count(*) as num from student group by sex")
//    val dataFrame1 = sparkSession.sql("select sex, count(*) as num from global_temp.student group by sex")
    dataFrame1.show()
  }
}

case class Student(@BeanProperty var name:String, @BeanProperty var age:Int, @BeanProperty var sex:String)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

3、DataFrame的保存操作

以结构化文件的格式保存数据、保存到Hive数据表存储目录、保存到JDBC。

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}

import java.util.Properties

/**
 * DataFrame中保存操作
 */
object StorageDemo {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("StorageDemo")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    import sparkSession.implicits._
    val seq: Seq[(String, Int, String)] = Array(("cy", 32, "男"), ("hr", 23, "女"), ("sq", 32, "男"), ("ly", 35, "女"))
    val dataFrame = seq.toDF("name", "age", "sex")

    /**
     * wirte.mode().saveAsTable("tableName") 将数据保存到某一个Hive数据表  严格意义上来说其实是将数据保存到spark.sql.warehouse.dir路径下
     * 而且如果我们这个环境配置了Hive支持,spark.sql.warehouse.dir路径就是我们指定的Hive的表数据存储路径
     * 默认情况下这个表不存在,会自动创建,如果这个表存在则会覆盖
     *
     * write.mode().csv/text/json/parquet/orc(路径)
     *
     * write.mode().format("xxxx").save()
     */
    dataFrame.createOrReplaceTempView("student")
    val dataFrame1 = sparkSession.sql("select * from student where age>25")
    dataFrame1.show()
    // 将数据写出到spark.sql.warehouse.dir路径下
//    dataFrame1.write.saveAsTable("student_info")
    // 表存在则追加
//    dataFrame1.write.mode(SaveMode.Append).saveAsTable("student_info")

    // 将数据以结构化文件写出
    dataFrame1.write.mode(SaveMode.Overwrite).csv("hdfs://node1:9000/sparksql/csv")
    dataFrame1.write.mode(SaveMode.Overwrite).json("hdfs://node1:9000/sparksql/json")
    dataFrame1.write.mode(SaveMode.Overwrite).parquet("hdfs://node1:9000/sparksql/parquet")
    dataFrame1.write.mode(SaveMode.Overwrite).orc("hdfs://node1:9000/sparksql/orc")

    val properties = new Properties()
    properties.setProperty("user","用户名")
    properties.setProperty("password","密码")
    dataFrame1.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://node1:3306/demo?serverTimezone=UTC","student_info",properties)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

4、查看结构和数据

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}

/**
 * DataFrame中查看操作
 */
object SeeDemo {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("SeeDemo")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    import sparkSession.implicits._
    val seq: Seq[(String, Int, String)] = Array(("cy", 32, "男"), ("hr", 23, "女"), ("sq", 32, "男"), ("ly", 35, "女"))
    val dataFrame = seq.toDF("name", "age", "sex")

    // 打印DataFrame的数据结构
    dataFrame.printSchema()

    /**
     * show展示前n条数据
     * show()  值打印DataFrame中的前20条数据,并且每一个数据超过20个字符将被截取
     * show(num)  值打印DataFrame中的前num条数据,并且每一个数据超过20个字符将被截取
     * show(num,true/false)  值打印DataFrame中的前num条数据,并且每一个数据是否被截取
     */
    dataFrame.show()
    dataFrame.show(3)
    dataFrame.show(3, false)

    /**
     * first、head、take、takeAsList 获取DataFrame中的前若干行数据
     */
    // first获取第一行数据
    val row: Row = dataFrame.first()
    println(row)
    // head 获取前n行数据
    val array: Array[Row] = dataFrame.head(2)
    println(array.mkString(","))
    // take 获取前n行数据
    val array1: Array[Row] = dataFrame.take(2)
    println(array1.mkString(","))
    // takeAsList
    val list: java.util.List[Row] = dataFrame.takeAsList(2)
    println(list)

    /**
     * collect/collectAsList   获取DataFrame中的所有数据——会造成内存溢出
     */
    val array2: Array[Row] = dataFrame.collect()
    println(array2.mkString("->"))
    val list1: java.util.List[Row] = dataFrame.collectAsList()
    println(list1)
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/622581
推荐阅读
相关标签
  

闽ICP备14008679号