当前位置:   article > 正文

SparkSQL系列-4、数据处理分析_要求通过rdd和spark sql两种方式对数据源的数据进行统计分析

要求通过rdd和spark sql两种方式对数据源的数据进行统计分析

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

传送门:大数据系列文章目录

官方网址http://spark.apache.org/http://spark.apache.org/sql/
在这里插入图片描述

有几种方式处理?

SparkSQL模块中,将结构化数据封装到DataFrame或Dataset集合中后,提供两种方式分析处理数据,正如前面案例【词频统计WordCount】两种方式:

DSL

DSL(domain-specific language)编程,调DataFrame/Dataset API(函数),类似RDD中函数;

SQL

SQL 编程,将DataFrame/Dataset注册为临时视图或表,编写SQL语句,类似HiveQL;

两种方式底层转换为RDD操作,包括性能优化完全一致,在实际项目中语句不通的习惯及业务灵活选择。比如机器学习相关特征数据处理,习惯使用DSL编程;比如数据仓库中数据ETL和报表分析,习惯使用SQL编程。无论哪种方式,都是相通的,必须灵活使用掌握。

基于DSL分析

调用DataFrame/Dataset中API(函数)分析数据,其中函数包含RDD中转换函数和类似SQL语句函数,部分截图如下:
在这里插入图片描述
类似SQL语法函数:调用Dataset中API进行数据分析, Dataset中涵盖很多函数,大致分类如下:

1、选择函数select:选取某些列的值
在这里插入图片描述
2、过滤函数filter/where:设置过滤条件,类似SQL中WHERE语句
在这里插入图片描述
3、分组函数groupBy/rollup/cube:对某些字段分组,在进行聚合统计
在这里插入图片描述
4、聚合函数agg:通常与分组函数连用,使用一些count、 max、 sum等聚合函数操作
在这里插入图片描述
5、排序函数sort/orderBy:按照某列的值进行排序(升序ASC或者降序DESC)
在这里插入图片描述
6、限制函数limit:获取前几条数据,类似RDD中take函数
在这里插入图片描述
7、重命名函数withColumnRenamed:将某列的名称重新命名
在这里插入图片描述

8、删除函数drop:删除某些列
在这里插入图片描述
9、增加列函数withColumn:当某列存在时替换值,不存在时添加此列
在这里插入图片描述

上述函数在实际项目中经常使用,尤其数据分析处理的时候,其中要注意, 调用函数时,通常指定某个列名称,传递Column对象,通过隐式转换转换字符串String类型为Column对象。
在这里插入图片描述
Dataset/DataFrame中转换函数,类似RDD中Transformation函数,使用差不多:
在这里插入图片描述

基于SQL分析

将Dataset/DataFrame注册为临时视图,编写SQL执行分析,分为两个步骤:

第一步、注册为临时视图
在这里插入图片描述
第二步、编写SQL,执行分析
在这里插入图片描述
其中SQL语句类似Hive中SQL语句,查看Hive官方文档, SQL查询分析语句语法,官方文档文档:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select
在这里插入图片描述

案例:电影评分数据分析

使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明:

对电影评分数据进行统计分析, 获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)

数据集ratings.dat总共100万条数据,数据格式如下,每行数据各个字段之间使用双冒号分开:

1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
1::919::4::978301368
1::595::5::978824268
1::938::4::978301752
1::2398::4::978302281
1::2918::4::978302124
1::1035::5::978301753
1::2791::4::978302188
1::2687::3::978824268
1::2018::4::978301777
1::3105::5::978301713
1::2797::4::978302039
1::2321::3::978302205
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

数据处理分析步骤如下:
⚫ 第一步、读取电影评分数据,从本地文件系统读取
⚫ 第二步、转换数据,指定Schema信息,封装到DataFrame
⚫ 第三步、基于SQL方式分析
⚫ 第四步、基于DSL方式分析

数据 ETL

读取电影评分数据,将其转换为DataFrame,使用指定列名方式定义Schema信息,代码如下:

    val spark: SparkSession = SparkSession.builder()
//    SparkSQL程序不论数据量的多少,在经过聚合shuffle时,RDD分区数会变为200
      .config("spark.sql.shuffle.partitions",2)
      .appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[2]").getOrCreate()

    spark.sparkContext.setLogLevel("WARN")
    //引入隐式转换
    import spark.implicits._
    //引入DSL函数库
    import org.apache.spark.sql.functions._

    //todo:2-实现处理的逻辑:统计平均评分最高的前10部电影【每部电影至少被评分2000次】
    val inputRDD: RDD[String] = spark.sparkContext.textFile("datas/ml-1m/ratings.dat")

    //step2:转换数据
    val rsData: RDD[(String, String, Double, Long)] = inputRDD.map(line => {
      val Array(userId, itemId, rating, timestamp) = line.split("::")
      (userId, itemId, rating.toDouble, timestamp.toLong)
    })
    //所有字段重命名
    val rsdf: DataFrame = rsData.toDF("userId", "itemId", "rating", "timestamp")
    /*
	root
	|-- userId: string (nullable = true)
	|-- movieId: string (nullable = true)
	|-- rating: double (nullable = false)
	|-- timestamp: long (nullable = false)
	*/
	//ratingsDF.printSchema()
	/*
	+------+-------+------+---------+
	|userId|movieId|rating|timestamp|
	+------+-------+------+---------+
	| 1| 1193| 5.0|978300760|
	| 1| 661| 3.0|978302109|
	| 1| 594| 4.0|978302268|
	| 1| 919| 4.0|978301368|
	+------+-------+------+---------+
*/
	//ratingsDF.show(4)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

使用SQL分析

首先将DataFrame注册为临时视图,再编写SQL语句,最后使用SparkSession执行,代码如下:

import model.MovieRating
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}

/**
 * @author: lwh
 * @date: 2022/08/15
 *        使用电影评分数据进行数据分析,分别使用 DSL编程和SQL编程,熟悉数据处理函数及SQL使
 *        用,业务需求说明:
 *        对电影评分数据进行统计分析, 获取Top10电影(电影评分平均值最高,并且每个电影被评分
 *        的次数大于2000)。
 **/
object RatingsSparkSQL {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[2]").getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    import spark.implicits._

    val inputRDD: RDD[String] = spark.sparkContext.textFile("datas/ml-1m/ratings.dat")

    //RDD反射构建Schema
    val dataRdd: RDD[MovieRating] = inputRDD.map(line => {
      val Array(userId, itemId, rating, timestamp) = line.split("::")
      MovieRating(userId.toInt, itemId.toInt, rating.toInt, timestamp.toInt)
    })
    val df: DataFrame = dataRdd.toDF()
    df.show()
    println("====================")
    val ds: Dataset[MovieRating] = dataRdd.toDS()
    ds.show()
    println("====================")

    df.createOrReplaceTempView("data_df_tmp")
    ds.createOrReplaceTempView("data_ds_tmp")

    val rsData: DataFrame = spark.sql(
      """
        |select
        | itemId,
        | round(avg(rating),2) as rat,
        | count(1) as cnt
        |from data_df_tmp
        |group by itemId having cnt > 2000
        |order by rat desc,cnt desc
        |limit 10
        |""".stripMargin)

    rsData.printSchema()
    rsData.show()

    rsData.write.mode(SaveMode.Overwrite).csv("datas/sparksql/output1")
    spark.stop()
  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

运行程序结果如下:

+------+--------+----+
|itemId|avg_rate| cnt|
+------+--------+----+
|   318|    4.55|2227|
|   858|    4.52|2223|
|   527|    4.51|2304|
|  1198|    4.48|2514|
|   260|    4.45|2991|
|  2762|    4.41|2459|
|   593|    4.35|2578|
|  2028|    4.34|2653|
|  2858|    4.32|3428|
|  2571|    4.32|2590|
+------+--------+----+
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

使用DSL分析

调用Dataset中函数,采用链式编程分析数据,核心代码如下:

import java.util.Properties

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}

/**
 * @author: lwh
 * @date: 2022/08/15
 *        使用电影评分数据进行数据分析,分别使用 DSL编程和SQL编程,熟悉数据处理函数及SQL使
 *        用,业务需求说明:
 *        对电影评分数据进行统计分析, 获取Top10电影(电影评分平均值最高,并且每个电影被评分
 *        的次数大于2000)。
 **/
object RatingsSparkDSL {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
//    SparkSQL程序不论数据量的多少,在经过聚合shuffle时,RDD分区数会变为200
      .config("spark.sql.shuffle.partitions",2)
      .appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[2]").getOrCreate()

    spark.sparkContext.setLogLevel("WARN")
    //引入隐式转换
    import spark.implicits._
    //引入DSL函数库
    import org.apache.spark.sql.functions._

    //todo:2-实现处理的逻辑:统计平均评分最高的前10部电影【每部电影至少被评分2000次】
    val inputRDD: RDD[String] = spark.sparkContext.textFile("datas/ml-1m/ratings.dat")

    //step2:转换数据
    val rsData: RDD[(String, String, Double, Long)] = inputRDD.map(line => {
      val Array(userId, itemId, rating, timestamp) = line.split("::")
      (userId, itemId, rating.toDouble, timestamp.toLong)
    })
    //所有字段重命名
    val rsdf: DataFrame = rsData.toDF("userId", "itemId", "rating", "timestamp")
    //列的过滤,将用到的列过滤:itemId,rating
    val rsData2: Dataset[Row] = rsdf.select($"itemId",$"rating").groupBy($"itemId")
      .agg(
        //统计平均分
        round(avg($"rating"), 2).as("avg_rate"),
        //统计评分次数
        count($"itemId").as("cnt")
      )
      //过滤:评分次数大于2000
      .where($"cnt".gt(2000))
      //排序
      .orderBy($"avg_rate".desc, $"cnt".desc)
      .limit(10)
    //step3:保存结果
    rsData2.printSchema()
    rsData2.show()

    //保存到MySQL
    rsData2
      .write
      .mode(SaveMode.Overwrite)
        .option("user","root")
        .option("password","123456")
        .jdbc("jdbc:mysql://localhost:3306/test?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true","test.tb_top10_movies",new Properties())

//    Thread.sleep(1000000000000000L)


    spark.stop()
  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

其中使用SparkSQL中自带函数库functions,org.apache.spark.sql.functions中,包含常用函数,有些与Hive中函数库类似,但是名称不一样。
在这里插入图片描述
使用需要导入函数库: import org.apache.spark.sql.functions._

保存结果数据

将分析结果数据保存到外部存储系统中,比如保存到MySQL数据库表中或者CSV文件中。

// TODO: 将分析的结果数据保存MySQL数据库和CSV文件
// 结果DataFrame被使用多次,缓存
resultDF.persist(StorageLevel.MEMORY_AND_DISK)
// 1. 保存MySQL数据库表汇总
resultDF
	.coalesce(1) // 考虑降低分区数目
	.write
	.mode("overwrite")
	.option("driver", "com.mysql.cj.jdbc.Driver")
	.option("user", "root")
	.option("password", "123456")
	.jdbc("jdbc:mysql://localhost:3306/serverTimezone=UTC&characterEncoding=utf8&useUnicode=true","db_test.tb_top10_movies",new Properties())
// 2. 保存CSV文件:每行数据中个字段之间使用逗号隔开
resultDF
	.coalesce(1)
	.write.mode("overwrite")
	.csv("datas/top10-movies")
// 释放缓存数据
resultDF.unpersist()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

查看数据库中结果表的数据:
在这里插入图片描述

案例完整代码

上面的SQL和DSL已经是完整代码了,再来一份整合在一起的完整代码。

电影评分数据分析,经过数据ETL、数据分析(SQL分析和DSL分析)及最终保存结果,整套数据处理分析流程,其中涉及到很多数据细节,完整代码如下:

import java.util.Properties
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.storage.StorageLevel

/**
 * 需求:对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)
 */
object SparkTop10Movie {
  def main(args: Array[String]): Unit = {
    // 构建SparkSession实例对象
    val spark: SparkSession = SparkSession.builder()
      .master("local[4]")
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      // TODO: 设置shuffle时分区数目
      .config("spark.sql.shuffle.partitions", "4")
      .getOrCreate()
    // 导入隐式转换
    import spark.implicits._
    // 1. 读取电影评分数据,从本地文件系统读取
    val rawRatingsDS: Dataset[String] = spark.read.textFile("datas/ml-1m/ratings.dat")
    // 2. 转换数据
    val ratingsDF: DataFrame = rawRatingsDS
      // 过滤数据
      .filter(line => null != line && line.trim.split("::").length == 4)
      // 提取转换数据
      .mapPartitions { iter =>
        iter.map { line =>
          // 按照分割符分割,拆箱到变量中
          val Array(userId, movieId, rating, timestamp) = line.trim.split("::")
          // 返回四元组
          (userId, movieId, rating.toDouble, timestamp.toLong)
        }
      }
      // 指定列名添加Schema
      .toDF("userId", "movieId", "rating", "timestamp")
    /*
    root
    |-- userId: string (nullable = true)
    |-- movieId: string (nullable = true)
    |-- rating: double (nullable = false)
    |-- timestamp: long (nullable = false)
    */
    //ratingsDF.printSchema()
    /*
    +------+-------+------+---------+
    |userId|movieId|rating|timestamp|
    +------+-------+------+---------+
    | 1| 1193| 5.0|978300760|
    | 1| 661| 3.0|978302109|
    | 1| 594| 4.0|978302268|
    | 1| 919| 4.0|978301368|
    +------+-------+------+---------+
    */
    //ratingsDF.show(4)
    // TODO: 基于SQL方式分析
    // 第一步、注册DataFrame为临时视图
    ratingsDF.createOrReplaceTempView("view_temp_ratings")
    // 第二步、编写SQL
    val top10MovieDF: DataFrame = spark.sql(
      """
        |SELECT
        | movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating
        |FROM
        | view_temp_ratings
        |GROUP BY
        | movieId
        |HAVING
        | cnt_rating > 2000
        |ORDER BY
        | avg_rating DESC, cnt_rating DESC
        |LIMIT
        | 10
      """.stripMargin)
    //top10MovieDF.printSchema()
    top10MovieDF.show(10, truncate = false)
    println("===============================================================")
    // TODO: 基于DSL=Domain Special Language(特定领域语言) 分析
    import org.apache.spark.sql.functions._
    val resultDF: DataFrame = ratingsDF
      // 选取字段
      .select($"movieId", $"rating")
      // 分组:按照电影ID,获取平均评分和评分次数
      .groupBy($"movieId")
      .agg( //
        round(avg($"rating"), 2).as("avg_rating"), //
        count($"movieId").as("cnt_rating") //
      )

      // 过滤:评分次数大于2000
      .filter($"cnt_rating" > 2000)
      // 排序:先按照评分降序,再按照次数降序
      .orderBy($"avg_rating".desc, $"cnt_rating".desc)
      // 获取前10
      .limit(10)
    //resultDF.printSchema()
    resultDF.show(10)
    // TODO: 将分析的结果数据保存MySQL数据库和CSV文件
    // 结果DataFrame被使用多次,缓存
    resultDF.persist(StorageLevel.MEMORY_AND_DISK)
    // 1. 保存MySQL数据库表汇总
    resultDF
      .coalesce(1) // 考虑降低分区数目
      .write
      .mode("overwrite")
      .option("driver", "com.mysql.cj.jdbc.Driver")
      .option("user", "root")
      .option("password", "123456")
      .jdbc(
        "jdbc:mysql://localhost:3306/test?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
        "test.tb_top10_movies",
        new Properties()
      )
    // 2. 保存CSV文件:每行数据中个字段之间使用逗号隔开
    resultDF
      .coalesce(1)
      .write.mode("overwrite")
      .csv("datas/top10-movies")
    // 释放缓存数据
    resultDF.unpersist()
    // 应用结束,关闭资源
    Thread.sleep(10000000)
    spark.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124

Shuffle 分区数目

运行上述程序时,查看WEB UI监控页面发现, 某个Stage中有200个Task任务,也就是说RDD有200分区Partition。
在这里插入图片描述
原因: 在SparkSQL中当Job中产生Shuffle时,默认的分区数(spark.sql.shuffle.partitions )为200,在实际项目中要合理的设置。 在构建SparkSession实例对象时,设置参数的值:

    // 构建SparkSession实例对象
    val spark: SparkSession = SparkSession.builder()
      .master("local[4]")
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      // TODO: 设置shuffle时分区数目
      .config("spark.sql.shuffle.partitions", "4")
      .getOrCreate()
    // 导入隐式转换
    import spark.implicits._
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

下回分解

下篇文章我们来看下DataSet的分析,以及RDD、DF与DS的转换。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/747499
推荐阅读
相关标签
  

闽ICP备14008679号