当前位置:   article > 正文

spark第三篇sql

spark第三篇sql

sparksql概述
  • 1、sparksql发展史
    • shark为spark提供了分布式数据仓库系统
    • shark依赖于hive的代码、依赖于spark的版本
  • 2、sparksql是什么
    • sparksql是spark的一个模块,主要用来处理结构化数据。
    • 操作sparksql的方式: sql 、dataframe、dataSet
sparksql四大特性
  • 1、易整合
    • 把sql语句与spark程序进行无缝混合使用
    • 采用4种Api:java/scala/python/R
  • 2、统一的数据源访问方式
    • 可以通过一种相同的方式对接任何外部数据源
    • sparkSession.read.文件格式(文件格式的路径)
  • 3、兼容hivesql
    • 可以在sparksql中写hivesql
  • 4、支持标准的数据库连接
    • JDBC和ODBC
dataframe概述
  • dataframe是什么

    • dataframe前身是schemaRDD,在spark1.3.0之后才出现的
    • DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库的二维表格,DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化。
  • DataFrame与RDD的区别

    • RDD可看作是分布式的对象的集合,Spark并不知道对象的详细模式信息,DataFrame可看作是分布式的Row对象的集合,其提供了由列组成的详细模式信息,使得Spark SQL可以进行某些形式的执行优化。

    在这里插入图片描述

  • dataFrame和rdd优缺点

    • rdd优缺点
      • 优点
        • 1、编译时类型安全
        • 2、具备面向对象编程风格
      • 缺点
        • 1、序列化和反序列化性能开销大
        • 2、频繁创建和销毁对象,造成大量的GC
    • dataFrame引入schema和off-heap(使用不在java堆中的内存,直接使用操作系统中的内存),决定了rdd缺点,同时丢失了rdd有点。dataframe不在是类型安全和面向对象编程风格。
通过读取数据源创建dataFrame
  • 1、读取文本文件创建dataFrame
    • sparkSession.read.text(“文本文件格式的路径”)
  • 2、读取json文件创建dataFrame
    • sparkSession.read.json(“json文件格式的路径”)
  • 3、读取parquet文件创建dataFrame
    • sparkSession.read.parquet(“parquet文件格式的路径”)
DataFrame常用操作
  • DSL语法风格

    • 它就是dataFrame自身提供的API

      1、打印DataFrame的schema
         printlnSchema
       2、查看dataFrame中的数据
         show
       3、取出第一位
         first
         head(N) 取出前N个
       4、查看某个字段
         peopleDF.select("name").show
         peopleDF.select(col("name")).show
         peopleDF.select($"name").show
         peopleDF.select(peopleDF("name")).show
       5、取出多个字段
         peopleDF.select("name","age").show
       6、让age字段+1
         peopleDF.select(col("age")+1).show
       7、过滤出年龄大于30的人数
         peopleDF.filter($"age" > 30).count
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
  • SQL风格语法

    • 通过将一个dataFrame注册成一张表,接下来就可以通过sql语句操作dataFrame

      • sparkSession.sql(sql语句)
      1、先需要将DataFrame注册成一张临时表
      	personDF.registerTempTable("t_person")
      	
      2、然后通过sparkSession.sql(sql语句)操作DataFrame
      	sparkSession.sql("select * from t_person").show
      
      • 1
      • 2
      • 3
      • 4
      • 5
DataSet
  • 1、DataSet是什么
    • DataSet是分布式的数据集合,Dataset提供了强类型支持,也是在RDD的每行数据加了类型约束。DataSet是在Spark1.6中添加的新的接口。它集中了RDD的优点(强类型和可以用强大lambda函数)以及使用了Spark SQL优化的执行引擎。
  • 2、DataSet特性
    • 继承了RDD的优点
      • 编译时类型安全
      • 面向对象编程风格
  • 3、创建Dataset
    • 1、spark.createDataSet(“已经存在的scala集合”)
    • 2、spark.createDataSet(“已经存在RDD”)
    • 3、已经存在的scala集合调用toDs
    • 4、通过dataFrame转换生成 as[强类型]
  • 4、dataSet与dataFrame互相转换
    • 1、将dataSet转化生成dataFrame
      • dataSet.toDF
    • 2、将dataFrame转换成dataSet
      • dataFrame.as[强类型]
将RDD转换为DataFrame代码开发
  • 导包

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.0.2</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
  • 1、 通过定义case class样例类利用反射机制推断Schema

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{Column, DataFrame, SparkSession}
    
    //todo:利用反射机制(case class )指定dataFrame的schema
    
    case class Person(id:Int,name:String,age:Int)
    object CaseClassSchema {
      def main(args: Array[String]): Unit = {
         //1、创建SparkSession
          val spark: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate()
         //2、获取sparkContext
          val sc: SparkContext = spark.sparkContext
          sc.setLogLevel("WARN")
         //3、读取数据文件
          val data: RDD[Array[String]] = sc.textFile("d:\\person.txt").map(x=>x.split(" "))
         //4、将rdd与样例类关联
          val personRDD: RDD[Person] = data.map(x=>Person(x(0).toInt,x(1),x(2).toInt))
         //5、获取DataFrame
          //手动导入隐式转换
          import spark.implicits._
          val personDF: DataFrame = personRDD.toDF
    
          //---------------DSL语法-------------start
         //1、打印dataframe的schema元信息
          personDF.printSchema()
         //2、 显示dataFrame结果数据
         personDF.show()
         personDF.show(2)
         //3、显示第一条数据
         println(personDF.head())
         //4、查询name字段结果数据
        personDF.select("name").show()
        personDF.select($"name").show()
        personDF.select(new Column("name")).show()
         //5、把age字段结果加1
        personDF.select($"id",$"name",$"age",$"age"+1).show()
         //6、把age 大于30的人过滤出来
        personDF.filter($"age" > 30).show()
         //7、按照age进行分组
        personDF.groupBy("age").count().show()
    
        //---------------DSL语法-------------end
    
        //--------------SQL语法--------------start
         //把dataFrame注册成一张表
          personDF.createTempView("t_person")
        //通过sparksession调用sql方法
          spark.sql("select * from t_person").show()
          spark.sql("select * from t_person where name='lisi'").show()
          spark.sql("select * from t_person order by age desc ").show()
        //--------------SQL语法--------------end
    
        //关闭操作
        sc.stop()
        spark.stop()
    
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
  • 2、通过StructType指定schema

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
    
    //todo:将rdd转换成dataFrame,通过StructType来指定schema
    object SparkSqlSchema {
      def main(args: Array[String]): Unit = {
           //1、创建sparkSession
           val spark: SparkSession = SparkSession.builder().appName("SparkSqlSchema").master("local[2]").getOrCreate()
           //2、获取得到sparkContext
            val sc: SparkContext = spark.sparkContext
            sc.setLogLevel("WARN")
           //3、读取数据文件
            val data: RDD[Array[String]] = sc.textFile("d:\\person.txt").map(_.split(" "))
           //4、将rdd与Row类型关联
            val rowRDD: RDD[Row] = data.map(x=>Row(x(0).toInt,x(1),x(2).toInt))
           //5、指定schema
            val schema:StructType=StructType(
                                           StructField("id", IntegerType, true) ::
                                           StructField("name", StringType, false) ::
                                           StructField("age", IntegerType, false) :: Nil)
    
           val personDF: DataFrame = spark.createDataFrame(rowRDD,schema)
    
            //打印schema
          personDF.printSchema()
            //显示数据
          personDF.show()
    
          //dataframe注册成一张表
          personDF.createTempView("t_person")
    
         spark.sql("select * from t_person").show()
    
         //关闭
        sc.stop()
        spark.stop()
    
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
sparksql 操作hivesql
  • 导包

            <!--引入 spark-hive依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.11</artifactId>
                <version>2.0.2</version>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
  • 代码开发

    import org.apache.spark.sql.SparkSession
    
    //todo:利用sparksql操作hivesql
    object HiveSupport {
      def main(args: Array[String]): Unit = {
          //1、创建sparkSession
         val spark: SparkSession = SparkSession.builder()
                                                .appName("HiveSupport")
                                                .master("local[2]")
                                                .enableHiveSupport()   //开启对hivesql的支持
                                                .getOrCreate()
        //2、利用sparkSession操作hivesql
            //2.1 创建hive表
              //spark.sql("create table if not exists student(id int,name string,age int) row format delimited fields terminated by ',' ")
            //2.2 加载数据到hive表中
              //spark.sql("load data local inpath './data/student.txt' into table student")
            //2.3 查询表中数据
              spark.sql("select * from student").show()
    
        //3、关闭
          spark.stop()
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
sparksql读取mysql表中的数据
  • 代码开发

    import java.util.Properties
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    //todo:利用sparksql从mysql表中读取数据
    object DataFromMysql {
      def main(args: Array[String]): Unit = {
         //1、创建sparkSession
          val spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate()
    
        //2、通过sparksession读取mysql表中的数据
          //定义url
          val url="jdbc:mysql://192.168.200.100:3306/spark"
          //定义表名
          val tableName="iplocation"
          //定义相关的属性
          val properties=new Properties
           properties.setProperty("user","root")
           properties.setProperty("password","123456")
          val jdbcDataFrame: DataFrame = spark.read.jdbc(url,tableName,properties)
        //显示schema
        jdbcDataFrame.printSchema()
        //打印结果数据
        jdbcDataFrame.show()
    
        //关闭
        spark.stop()
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
  • spark-shell 操作读取mysql表中

    • 启动spark-shell脚本

      spark-shell \
      --master spark://hdp-node-01:7077 \
      --executor-memory 1g \
      --total-executor-cores  2 \
      --jars /opt/bigdata/hive/lib/mysql-connector-java-5.1.35.jar \
      --driver-class-path /opt/bigdata/hive/lib/mysql-connector-java-5.1.35.jar
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
    • 执行代码

      val mysqlDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.200.100:3306/spark", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "iplocation", "user" -> "root", "password" -> "123456")).load()
      
      • 1
sparksql将结果数据写入到mysql中
  • 代码开发

    import java.util.Properties
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    //todo:利用sparksql将结果数据写入到mysql表中
    
    case class Student(id:Int,name:String,age:Int)
    object Data2Mysql {
      def main(args: Array[String]): Unit = {
         //1、创建sparkSession
          val spark: SparkSession = SparkSession.builder().appName("Data2Mysql").getOrCreate()
         //2、获取sparkcontext
          val sc: SparkContext = spark.sparkContext
            sc.setLogLevel("WARN")
         //3、读取数据文件
          val rdd: RDD[Array[String]] = sc.textFile(args(0)).map(_.split(" "))
         //4、将样例类与rdd进行关联
          val studentRDD: RDD[Student] = rdd.map(x=>Student(x(0).toInt,x(1),x(2).toInt))
         //5、将rdd转换成dataframe
         import spark.implicits._
          val studentDF: DataFrame = studentRDD.toDF
    
          //打印结果数据
          studentDF.show()
    
          //dataFrame注册成一张表
        studentDF.createTempView("t_student")
    
          //通过sparkSession操作这个表
        val result: DataFrame = spark.sql("select * from t_student order by age desc")
          //把结果数据写入到mysql表中
          //定义url
          val url="jdbc:mysql://192.168.200.100:3306/spark"
        //定义表名
         val tableName=args(1)
        //定义相关的属性
        val properties=new Properties
        properties.setProperty("user","root")
        properties.setProperty("password","123456")
        //mode:指定数据插入模式
        //overwrite: 覆盖(事先会创建一张表)
        //append: 追加(事先会创建一张表)
        //ignore:忽略(如果当前这个表已经存在,不执行操作)
        //error:如果当前这个表存在,这个时候就报错
        result.write.mode("append").jdbc(url,tableName,properties)
    
        //关闭
        sc.stop()
        spark.stop()
        }  
       }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
  • 把程序打成jar包 提交到集群中运行

    
    spark-submit --master spark://node1:7077 --class cn.包名.sql.Data2Mysql --executor-memory 1g --total-executor-cores 2 --jars /export/servers/hive/lib/mysql-connector-java-5.1.35.jar --driver-class-path /export/servers/hive/lib/mysql-connector-java-5.1.35.jar original-spark_class06-2.0.jar /person.txt student100
    
    • 1
    • 2
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/691635
推荐阅读
相关标签
  

闽ICP备14008679号