当前位置:   article > 正文

【大数据实训日记】Day10 Spark SQL基础_本任务主要针对sparksql进行实训。1. 数据准

本任务主要针对sparksql进行实训。1. 数据准

一、 Spark SQL介绍

​        Spark SQL是一个用于结构化数据处理的Spark组件。所谓结构化数据,是指具有Schema信息的数据,例如JSON、Parquet、Avro、CSV格式的数据。与基础的Spark RDD API不同,Spark SQL提供了对结构化数据的查询和计算接口。

二、Spark SQL的主要特点:

1.将SQL查询与Spark应用程序无缝组合

​     Spark SQL允许使用SQL或熟悉的API在Spark程序中查询结构化数据。与Hive不同的是,Hive是将SQL翻译成MapReduce作业,底层是基于MapReduce的;而Spark SQL底层使用的是Spark RDD。

2.可以连接到多种数据源

​     Spark SQL提供了访问各种数据源的通用方法,数据源包括Hive、Avro、Parquet、ORC、JSON、JDBC等。

3. 在现有的数据仓库上运行SQL或HiveQL查询

​     Spark SQL支持HiveQL语法以及Hive SerDes和UDF (用户自定义函数) ,允许访问现有的Hive仓库。

4.DataFrame和DataSet

- DataFrame的结构

​      DataFrame是Spark SQL提供的一个编程抽象,与RDD类似,也是一个分布式的数据集合。但与RDD不同的是,DataFrame的数据都被组织到有名字的列中,就像关系型数据库中的表一样。

​     DataFrame在RDD的基础上添加了数据描述信息(Schema,即元信息) ,因此看起来更像是一张数据库表。

- DataSet的结构

   Dataset是一个分布式数据集,是Spark 1.6中添加的一个新的API。相比于RDD, Dataset提供了强类型支持,在RDD的每行数据加了类型约束。

​     在Spark中,一个DataFrame代表的是一个元素类型为Row的Dataset,即DataFrame只是Dataset[Row]的一个类型别名。

三、Spark SQL的基本使用

​     Spark Shell启动时除了默认创建一个名为sc的SparkContext的实例外,还创建了一个名为spark的SparkSession实例,该spark变量可以在Spark Shell中直接使用。

​     SparkSession只是在SparkContext基础上的封装,应用程序的入口仍然是SparkContext。SparkSession允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序,支持从不同的数据源加载数据,并把数据转换成DataFrame,然后使用SQL语句来操作DataFrame数据。

四、Spark SQL基本使用案例

在HDFS中有一个文件/input/person.txt,文件内容如下:

现需要使用Spark SQL将该文件中的数据按照年龄降序排列,步骤如下:

进入spark-shell环境

- 加载数据为Dataset
val d1 = spark.read.textFile("hdfs://192.168.121.131:9000/input/person.txt")

d1.show() # 查看d1中的数据内容

​      从上述代码的结果可以看出,Dataset将文件中的每一行看作一个元素,并且所有元素组成了一列,列名默认为value。

- 给Dataset添加元数据信息

​    定义一个样例类Person,用于存放数据描述信息,代码如下:
case class Person(id:Int,name:String,age:Int)

​    注:Scala有一种特殊的类叫做样例类(case class)。默认情况下,样例类一般用于不可变对象(样例类构造参数默认声明为val)。

​    调用Dataset的map()算子将每一个元素拆分并存入Person类中,代码如下:
val personDataset = d1.map(line=>{
          val fields = line.split(",")
          val id = fields(0).toInt
          val name = fields(1)
          val age = fields(2).toInt
          Person(id,name,age)
          })
          
personDataset.show() # 查看personDataset中的数据内容       

可以看到,personDataset中的数据类似于一张关系型数据库的表。

- 将Dataset转为DataFrame

​     Spark SQL查询的是DataFrame中的数据,因此需要将存有元数据信息的Dataset转为DataFrame。

​    调用Dataset的toDF()方法,将存有元数据的Dataset转为DataFrame,代码如下:

val pdf = personDataset.toDF()

- 执行SQL查询

​    在DataFrame上创建一个临时视图v_person,并使用SparkSession对象执行SQL查询,代码如下:
pdf.createTempView("v_person")
val result = spark.sql("select * from v_person order by age desc")
result.show()

五、Spark SQL函数

1.内置函数

​     Spark SQL内置了大量的函数,位于API org.apache.spark.sql.functions中。其中大部分函数与Hive中的相同。

​     使用内置函数有两种方式:一种是通过编程的方式使用;另一种是在SQL语句中使用。

- 以编程的方式使用lower()函数将用户姓名转为小写/大写,代码如下:

```
df.select(lower(col("name")).as("greet")).show()
df.select(upper(col("name")).as("greet")).show()
```

​     上述代码中,df指的是DataFrame对象,使用select()方法传入需要查询的列,使用as()方法指定列的别名。代码col("name")指定要查询的列,也可以使用$"name"代替,代码如下:

```
df.select(lower($"name").as("greet")).show()
```

- 以SQL语句的方式使用lower()函数,代码如下:

```
df.createTempView("temp")
spark.sql("select upper(name) as greet from temp").show()
```

​     除了可以使用select()方法查询指定的列外,还可以直接使用filter()、groupBy()等方法对DataFrame数据进行过滤和分组,例如以下代码:

```
df.printSchema()  # 打印Schema信息
df.select("name").show()  # 查询name列
# 查询name列和age列,其中将age列的值增加1
df.select($"name",$"age"+1).show()
df.filter($"age">25).show() # 查询age>25的所有数据
# 根据age进行分组,并求每一组的数量
df.groupBy("age").count().show() 
```

2.自定义函数

​     当Spark SQL提供的内置函数不能满足查询需求时,用户可以根据需求编写自定义函数(User Defined Functions, UDF),然后在Spark SQL中调用。

​    例如有这样一个需求:为了保护用户的隐私,当查询数据的时候,需要将用户手机号的中间4位数字用星号(*)代替,比如手机号180****2688。这时就可以编写一个自定义函数来实现这个需求,实现代码如下:

  1. package spark.demo.sql
  2. import org.apache.spark.rdd.RDD
  3. import org.apache.spark.sql.types.{StringType, StructField, StructType}
  4. import org.apache.spark.sql.{Row, SparkSession}
  5. /**
  6.  * 用户自定义函数,隐藏手机号中间4
  7.  */
  8. object SparkSQLUDF {
  9.   def main(args: Array[String]): Unit = {
  10.     //创建或得到SparkSession
  11.     val spark = SparkSession.builder()
  12.       .appName("SparkSQLUDF")
  13.       .master("local[*]")
  14.       .getOrCreate()
  15.     //第一步:创建测试数据(或直接从文件中读取)
  16.     //模拟数据
  17.     val arr=Array("18001292080","13578698076","13890890876")
  18.     //将数组数据转为RDD
  19.     val rdd: RDD[String] = spark.sparkContext.parallelize(arr)
  20.     //将RDD[String]转为RDD[Row]
  21.     val rowRDD: RDD[Row] = rdd.map(line=>Row(line))
  22.     //定义数据的schema
  23.     val schema=StructType(
  24.       List{
  25.         StructField("phone",StringType,true)
  26.       }
  27.     )
  28.     //将RDD[Row]转为DataFrame
  29.     val df = spark.createDataFrame(rowRDD, schema)
  30.     //第二步:创建自定义函数(phoneHide)
  31.     val phoneUDF=(phone:String)=>{
  32.       var result = "手机号码错误!"
  33.       if (phone != null && (phone.length==11)) {
  34.         val sb = new StringBuffer
  35.         sb.append(phone.substring(0, 3))
  36.         sb.append("****")
  37.         sb.append(phone.substring(7))
  38.         result = sb.toString
  39.       }
  40.       result
  41.     }
  42.     //注册函数(第一个参数为函数名称,第二个参数为自定义的函数)
  43.     spark.udf.register("phoneHide",phoneUDF)
  44.     //第三步:调用自定义函数
  45.     df.createTempView("t_phone")        //创建临时视图
  46.     spark.sql("select phoneHide(phone) as phone from t_phone").show()
  47.     // +-----------+
  48.     // |      phone|
  49.     // +-----------+
  50.     // |180****2080|
  51.     // |135****8076|
  52.     // |138****0876|
  53.     // +-----------+
  54.   }
  55. }

3.窗口(开窗)函数

​    开窗函数是为了既显示聚合前的数据,又显示聚合后的数据,即在每一行的最后一列添加聚合函数的结果。开窗口函数有以下功能:

- 同时具有分组和排序的功能

- 不减少原表的行数

- 开窗函数语法:

聚合类型开窗函数

```
sum()/count()/avg()/max()/min() OVER([PARTITION BY XXX] [ORDER BY XXX [DESC]]) 
```

排序类型开窗函数

```
ROW_NUMBER() OVER([PARTITION BY XXX] [ORDER BY XXX [DESC]])
```

- 以row_number()开窗函数为例:

​        开窗函数row_number()是Spark SQL中常用的一个窗口函数,使用该函数可以在查询结果中对每个分组的数据,按照其排列的顺序添加一列行号(从1开始),根据行号可以方便地对每一组数据取前N行(分组取TopN)。row_number()函数的使用格式如下:

```
row_number() over (partition by 列名 order by 列名 desc) 行号列别名
```

上述格式说明如下:

partition by:按照某一列进行分组;

order by:分组后按照某一列进行组内排序;

desc:降序,默认升序。

例如,统计每一个产品类别的销售额前3名,代码如下:

  1. package spark.demo.sql
  2. import org.apache.spark.sql.types._
  3. import org.apache.spark.sql.{Row, SparkSession}
  4. /**
  5.  * 统计每一个产品类别的销售额前3名(相当于分组求TOPN)
  6.  */
  7. object SparkSQLWindowFunctionDemo {
  8.   def main(args: Array[String]): Unit = {
  9.     //创建或得到SparkSession
  10.     val spark = SparkSession.builder()
  11.       .appName("SparkSQLWindowFunctionDemo")
  12.       .master("local[*]")
  13.       .getOrCreate()
  14.     //第一步:创建测试数据(字段:日期、产品类别、销售额)
  15.     val arr=Array(
  16.       "2019-06-01,A,500",
  17.       "2019-06-01,B,600",
  18.       "2019-06-01,C,550",
  19.       "2019-06-02,A,700",
  20.       "2019-06-02,B,800",
  21.       "2019-06-02,C,880",
  22.       "2019-06-03,A,790",
  23.       "2019-06-03,B,700",
  24.       "2019-06-03,C,980",
  25.       "2019-06-04,A,920",
  26.       "2019-06-04,B,990",
  27.       "2019-06-04,C,680"
  28.     )
  29.     //转为RDD[Row]
  30.     val rowRDD=spark.sparkContext
  31.       .makeRDD(arr)
  32.       .map(line=>Row(
  33.         line.split(",")(0),
  34.         line.split(",")(1),
  35.         line.split(",")(2).toInt
  36.       ))
  37.     //构建DataFrame元数据
  38.     val structType=StructType(Array(
  39.       StructField("date",StringType,true),
  40.       StructField("type",StringType,true),
  41.       StructField("money",IntegerType,true)
  42.     ))
  43.     //将RDD[Row]转为DataFrame
  44.     val df=spark.createDataFrame(rowRDD,structType)
  45.     //第二步:使用开窗函数取每一个类别的金额前3
  46.     df.createTempView("t_sales")        //创建临时视图
  47.     //执行SQL查询
  48.     spark.sql(
  49.       "select date,type,money,rank from " +
  50.         "(select date,type,money," +
  51.         "row_number() over (partition by type order by money desc) rank "+
  52.         "from t_sales) t " +
  53.         "where t.rank<=3"
  54.     ).show()
  55.   }
  56. }

声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号