当前位置:   article > 正文

Sparksql的介绍以及常见操作_spark sql基本操作

spark sql基本操作

撰写本文的目的:对于sparksql,网上有大量的详细文档,本人针对常用的操作进行一个整理,当然有大多数都是从其他地方搬过来的,包括官方文档以及其他网友的一些分享,一来是通过此次整理加强自己的记忆,二来如果有幸帮到某位网友,那是本人莫大的荣幸,先感谢您的阅读,废话不多说,进入正文:

    下文所涉及到的相关软件版本分别为:

    spark版本:v2.2.0

    hive  :  v1.2.1

    hadoop :  v2.7.6

前言:

    Spark sql是spark处理结构化数据的一个模块,它的前身是shark,与基础的spark rdd不同,spark sql提供了结构化数据及计算结果等信息的接口,在内部,spark sql使用这个额外的信息去执行额外的优化,有几种方式可以跟spark sql进行交互,包括sql和dataset api,使用相同的执行引擎进行计算的时候,无论是使用哪一种计算引擎都可以一快速的计算。

Dataset and DataFrames

  RDD:在spark刚开始的时候,引入RDD(弹性分布式数据集)

    优点:

      1)编译时类型安全,编译时就能检查出类型错误

      2)面向对象的编程分格,直接通过类名点的方式来操作数据

      例如:idAge.filter(_.age > "") //编译时直接报错

         idAgeRDDPerson.filter(_.age > 25) //直接操作一个个的person对象

    缺点:

      1)序列化和反序列化的性能开销,无论是集群间的通信还是IO操作,都需要对对象的结果和数据进行序列化和反序列化

      2)GC的性能开销,频繁的创建和销毁对象,势必会增加GC

  DataFrame:spark1.3的时候引入了DataFrmae,是一个列方式组织的分布式数据集

    优点:

      1)引入了Schema,包含了一ROW位单位的每行数据的列信息,spark通过Schema就能够读懂数据,因此在通信和IO时就只需要序列化和反序列化数据,而结构的部分就可以省略了;

      2)off-heap:spark能够以二进制的形式序列化数据(不包括结构)到off-heap(堆外内存),当要操作数据时,就直接操作off-heap内存,off-heap类似于地盘,schema类似于地图,Spark有了地图又有了自己地盘了,就可以自己说了算,不再受JVM的限制,也就不再受GC的困扰了,通过Schema和off-heap,DataFrame克服了RDD的缺点。对比RDD提升了计算效率,减少了数据的读取,底层计算优化

      3)引入了新的引擎:Tungsten

      4)引入了新的语法解析框架:Catalyst

    缺点:

      DataFrame客服了RDD 的缺点,但是丢失了RDD的优点,DataFrame不是类型安全的,API也不是面向对象分格的。

      1)API不是面向对象的

        idAgeDF.filter(idAgeDF.col("age") > 22)

      2)DataFrame不是编译时类型安全的,下面这种情况下不会报错

        idAgeDF.filter(idAgeDF.col("age") > "")

  DataSet:到spark1.6的时候引入了DataSet,Encoder分布式数据集,是一个被添加的新接口,它提供了RDD 的优点(强类型化,能够使用强大的lambda函数)

 
  1. /**
  2. * @groupname basic Basic Dataset functions
  3. * @groupname action Actions
  4. * @groupname untypedrel Untyped transformations
  5. * @groupname typedrel Typed transformations
  6. *
  7. * @since 1.6.0
  8. */
  9. @InterfaceStability.Stable
  10. class Dataset[T] private[sql](
  11. @transient val sparkSession: SparkSession,
  12. @DeveloperApi @InterfaceStability.Unstable @transient val queryExecution: QueryExecution,
  13. encoder: Encoder[T])
  14. extends Serializable {

       DataSet是一个类,其中包含了三个参数:

  1.      DataFrame

      1、集合转DataFrame

     
      
    1. val ssc = SparkSession().Builder.master("test").appName("test").getOrCreate
    2. val seq1 = Seq(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
    3. val df1 = ssc.createDataFrame(seq1).withColumnRenamed("_1", "name1").
    4. withColumnRenamed("_2", "age1").withColumnRenamed("_3", "height1")
    5. df1.orderBy(desc("age1")).show(10)
    6. import ssc.implicit._
    7. val df2 = ssc.createDataFrame(seq1).toDF("name", "age", "height")

       2、RDD转DataFrame

     
      
    1. import org.apache.spark.sql.Row
    2. import org.apache.spark.sql.types._
    3. val arr = Array(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
    4. val rdd1 = sc.makeRDD(arr).map(f=>Row(f._1, f._2, f._3))
    5. val schema = StructType( StructField("name", StringType, false) ::
    6. StructField("age", IntegerType, false) ::
    7. StructField("height", IntegerType, false) :: Nil)<br>// false:说明该字段不允许为null true:说明该字段可以为null
    8. val rddToDF = spark.createDataFrame(rdd1, schema)
    9. rddToDF.orderBy(desc("name")).show(false)

      DataSet

      1、由range生成DataSet

     
      
    1. val numDS = spark.range(5,100,5)
    2. numDS.orderBy(desc("id")).show(5)
    3. numDS.describe().show

      

       2、由集合生成DS

     
      
    1. case class Person(name:String, age:Int, height:Int)
    2. val seq1 = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144), Person("Andy", 16, 165))
    3. val spark:SparkSession = SparkSession.Builder....
    4. val ds1 = spark.createDataset(seq1)
    5. ds1.show
    6. val seq2 = Seq(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
    7. val ds2 = spark.createDataset(seq2)
    8. ds2.show

       3、由RDD进行转换

     
      
    1. import org.apache.spark.sql.types._
    2. import org.apache.spark.sql.Row
    3. val arr = Array(("Jack", 28, 184), ("Tom", 10, 144), ("Andy", 16, 165))
    4. val rdd2 = sc.makeRDD(arr).map(f=>Row(f._1, f._2, f._3))<br>val rdd3 = sc.makeRDD(arr).map(f=>Person(r._1,f._2,f._3))
    5. val ds3 = sc.createDataset(rdd2)<br>val ds4 = rdd3.toDS()
    6. ds3.show(10)

     通过SparkSession读取文件

     
      
    1. import org.apache.spark.sql.types._
    2. val schema2 = StructType( StructField("name", StringType, false) ::
    3. StructField("age", IntegerType, false) ::
    4. StructField("height", IntegerType, false) :: Nil)
    5. val df7 = ssc.read.options(Map(("delimiter", ","), ("header", "false"))).schema(schema2).csv("file:///home/spark/t01.csv") // 读取本地文件
    6. df7.show()

    DataSet的基础函数

     
      
    1. import org.apache.spark.storage.StorageLevel
    2. import org.apache.spark.sql.types._
    3. case class Person(name:String, age:Int, height:Int)
    4. spark.sparkContext.setCheckpointDir("hdfs://node1:8020/checkpoint")
    5. // 1 DataSet存储类型
    6. val seq1 = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144), Person("Andy", 16, 165))
    7. val ds1 = spark.createDataset(seq1)
    8. ds1.show()
    9. ds1.checkpoint()
    10. ds1.cache()
    11. ds1.persist(StorageLevel.MEMORY_ONLY)
    12. ds1.count()
    13. ds1.show()
    14. ds1.unpersist(true)
    15. // 2 DataSet结构属性
    16. ds1.columns
    17. ds1.dtypes
    18. ds1.explain()
    19. ds1.col("name")
    20. ds1.printSchema
    21. // 常用
    22. // 3 DataSet rdd数据互转
    23. val rdd1 = ds1.rdd
    24. val ds2 = rdd1.toDS()
    25. ds2.show()
    26. val df2 = rdd1.toDF()
    27. df2.show()
    28. // 4 Dataset保存文件
    29. ds1.select("name", "age", "height").write.format("csv").save("data/sql1/my01.csv")
    30. // 读取保存的文件
    31. val schema2 = StructType( StructField("name", StringType, false) ::
    32. StructField("age", IntegerType, false) ::
    33. StructField("height", IntegerType, false) :: Nil)
    34. val out = spark.read.
    35. options(Map(("delimiter", ","), ("header", "false"))).
    36. schema(schema2).csv("data/sql1/*")
    37. out.show(10)

    DataSet的Action操作

     
      
    1. // 1 显示数据集
    2. val seq1 = Seq(Person("Jack", 28, 184), Person("Tom", 10, 144), Person("Andy", 16, 165))
    3. val ds1 = spark.createDataset(seq1)
    4. // 缺省显示20行
    5. ds1.show()
    6. // 显示2行
    7. ds1.show(2)
    8. // 显示20行,不截断字符
    9. ds1.show(20, false)
    10. // 2 获取数据集
    11. // collect返回的是数组
    12. val c1 = ds1.collect()
    13. // collectAsList返回的是List
    14. val c2 = ds1.collectAsList()
    15. val h2 = ds1.head()
    16. val h2 = ds1.head(3)
    17. val f1 = ds1.first()
    18. val f2 = ds1.take(2)
    19. val t2 = ds1.takeAsList(2)
    20. ds.limit(10).show
    21. // 取10行数据生成新的DataSet
    22. // 3 统计数据集
    23. ds1.count()
    24. // 返回全部列的统计(count、mean、stddev、min、max)
    25. ds1.describe().show
    26. // 返回指定列的统计(count、mean、stddev、min、max)
    27. ds1.describe("age").show
    28. ds1.describe("age", "height").show
    29. // 4 聚集
    30. ds1.reduce{ (f1, f2) => Person("sum", f1.age+f2.age, f1.height+f2.height) }

      

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/589137
推荐阅读
相关标签
  

闽ICP备14008679号