赞
踩
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
传送门:大数据系列文章目录
官方网址:http://spark.apache.org/、 http://spark.apache.org/sql/
在SparkSQL模块中,将结构化数据封装到DataFrame或Dataset集合中后,提供两种方式分析处理数据,正如前面案例【词频统计WordCount】两种方式:
DSL(domain-specific language)编程,调DataFrame/Dataset API(函数),类似RDD中函数;
SQL 编程,将DataFrame/Dataset注册为临时视图或表,编写SQL语句,类似HiveQL;
两种方式底层转换为RDD操作,包括性能优化完全一致,在实际项目中语句不通的习惯及业务灵活选择。比如机器学习相关特征数据处理,习惯使用DSL编程;比如数据仓库中数据ETL和报表分析,习惯使用SQL编程。无论哪种方式,都是相通的,必须灵活使用掌握。
调用DataFrame/Dataset中API(函数)分析数据,其中函数包含RDD中转换函数和类似SQL语句函数,部分截图如下:
类似SQL语法函数:调用Dataset中API进行数据分析, Dataset中涵盖很多函数,大致分类如下:
1、选择函数select:选取某些列的值
2、过滤函数filter/where:设置过滤条件,类似SQL中WHERE语句
3、分组函数groupBy/rollup/cube:对某些字段分组,在进行聚合统计
4、聚合函数agg:通常与分组函数连用,使用一些count、 max、 sum等聚合函数操作
5、排序函数sort/orderBy:按照某列的值进行排序(升序ASC或者降序DESC)
6、限制函数limit:获取前几条数据,类似RDD中take函数
7、重命名函数withColumnRenamed:将某列的名称重新命名
8、删除函数drop:删除某些列
9、增加列函数withColumn:当某列存在时替换值,不存在时添加此列
上述函数在实际项目中经常使用,尤其数据分析处理的时候,其中要注意, 调用函数时,通常指定某个列名称,传递Column对象,通过隐式转换转换字符串String类型为Column对象。
Dataset/DataFrame中转换函数,类似RDD中Transformation函数,使用差不多:
将Dataset/DataFrame注册为临时视图,编写SQL执行分析,分为两个步骤:
第一步、注册为临时视图
第二步、编写SQL,执行分析
其中SQL语句类似Hive中SQL语句,查看Hive官方文档, SQL查询分析语句语法,官方文档文档:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select
使用电影评分数据进行数据分析,分别使用DSL编程和SQL编程,熟悉数据处理函数及SQL使用,业务需求说明:
对电影评分数据进行统计分析, 获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)。
数据集ratings.dat总共100万条数据,数据格式如下,每行数据各个字段之间使用双冒号分开:
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
1::3408::4::978300275
1::2355::5::978824291
1::1197::3::978302268
1::1287::5::978302039
1::2804::5::978300719
1::594::4::978302268
1::919::4::978301368
1::595::5::978824268
1::938::4::978301752
1::2398::4::978302281
1::2918::4::978302124
1::1035::5::978301753
1::2791::4::978302188
1::2687::3::978824268
1::2018::4::978301777
1::3105::5::978301713
1::2797::4::978302039
1::2321::3::978302205
数据处理分析步骤如下:
⚫ 第一步、读取电影评分数据,从本地文件系统读取
⚫ 第二步、转换数据,指定Schema信息,封装到DataFrame
⚫ 第三步、基于SQL方式分析
⚫ 第四步、基于DSL方式分析
读取电影评分数据,将其转换为DataFrame,使用指定列名方式定义Schema信息,代码如下:
val spark: SparkSession = SparkSession.builder()
// SparkSQL程序不论数据量的多少,在经过聚合shuffle时,RDD分区数会变为200
.config("spark.sql.shuffle.partitions",2)
.appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[2]").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
//引入隐式转换
import spark.implicits._
//引入DSL函数库
import org.apache.spark.sql.functions._
//todo:2-实现处理的逻辑:统计平均评分最高的前10部电影【每部电影至少被评分2000次】
val inputRDD: RDD[String] = spark.sparkContext.textFile("datas/ml-1m/ratings.dat")
//step2:转换数据
val rsData: RDD[(String, String, Double, Long)] = inputRDD.map(line => {
val Array(userId, itemId, rating, timestamp) = line.split("::")
(userId, itemId, rating.toDouble, timestamp.toLong)
})
//所有字段重命名
val rsdf: DataFrame = rsData.toDF("userId", "itemId", "rating", "timestamp")
/*
root
|-- userId: string (nullable = true)
|-- movieId: string (nullable = true)
|-- rating: double (nullable = false)
|-- timestamp: long (nullable = false)
*/
//ratingsDF.printSchema()
/*
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
| 1| 1193| 5.0|978300760|
| 1| 661| 3.0|978302109|
| 1| 594| 4.0|978302268|
| 1| 919| 4.0|978301368|
+------+-------+------+---------+
*/
//ratingsDF.show(4)
首先将DataFrame注册为临时视图,再编写SQL语句,最后使用SparkSession执行,代码如下:
import model.MovieRating
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}
/**
* @author: lwh
* @date: 2022/08/15
* 使用电影评分数据进行数据分析,分别使用 DSL编程和SQL编程,熟悉数据处理函数及SQL使
* 用,业务需求说明:
* 对电影评分数据进行统计分析, 获取Top10电影(电影评分平均值最高,并且每个电影被评分
* 的次数大于2000)。
**/
object RatingsSparkSQL {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[2]").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
val inputRDD: RDD[String] = spark.sparkContext.textFile("datas/ml-1m/ratings.dat")
//RDD反射构建Schema
val dataRdd: RDD[MovieRating] = inputRDD.map(line => {
val Array(userId, itemId, rating, timestamp) = line.split("::")
MovieRating(userId.toInt, itemId.toInt, rating.toInt, timestamp.toInt)
})
val df: DataFrame = dataRdd.toDF()
df.show()
println("====================")
val ds: Dataset[MovieRating] = dataRdd.toDS()
ds.show()
println("====================")
df.createOrReplaceTempView("data_df_tmp")
ds.createOrReplaceTempView("data_ds_tmp")
val rsData: DataFrame = spark.sql(
"""
|select
| itemId,
| round(avg(rating),2) as rat,
| count(1) as cnt
|from data_df_tmp
|group by itemId having cnt > 2000
|order by rat desc,cnt desc
|limit 10
|""".stripMargin)
rsData.printSchema()
rsData.show()
rsData.write.mode(SaveMode.Overwrite).csv("datas/sparksql/output1")
spark.stop()
}
}
运行程序结果如下:
+------+--------+----+
|itemId|avg_rate| cnt|
+------+--------+----+
| 318| 4.55|2227|
| 858| 4.52|2223|
| 527| 4.51|2304|
| 1198| 4.48|2514|
| 260| 4.45|2991|
| 2762| 4.41|2459|
| 593| 4.35|2578|
| 2028| 4.34|2653|
| 2858| 4.32|3428|
| 2571| 4.32|2590|
+------+--------+----+
调用Dataset中函数,采用链式编程分析数据,核心代码如下:
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
/**
* @author: lwh
* @date: 2022/08/15
* 使用电影评分数据进行数据分析,分别使用 DSL编程和SQL编程,熟悉数据处理函数及SQL使
* 用,业务需求说明:
* 对电影评分数据进行统计分析, 获取Top10电影(电影评分平均值最高,并且每个电影被评分
* 的次数大于2000)。
**/
object RatingsSparkDSL {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
// SparkSQL程序不论数据量的多少,在经过聚合shuffle时,RDD分区数会变为200
.config("spark.sql.shuffle.partitions",2)
.appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[2]").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
//引入隐式转换
import spark.implicits._
//引入DSL函数库
import org.apache.spark.sql.functions._
//todo:2-实现处理的逻辑:统计平均评分最高的前10部电影【每部电影至少被评分2000次】
val inputRDD: RDD[String] = spark.sparkContext.textFile("datas/ml-1m/ratings.dat")
//step2:转换数据
val rsData: RDD[(String, String, Double, Long)] = inputRDD.map(line => {
val Array(userId, itemId, rating, timestamp) = line.split("::")
(userId, itemId, rating.toDouble, timestamp.toLong)
})
//所有字段重命名
val rsdf: DataFrame = rsData.toDF("userId", "itemId", "rating", "timestamp")
//列的过滤,将用到的列过滤:itemId,rating
val rsData2: Dataset[Row] = rsdf.select($"itemId",$"rating").groupBy($"itemId")
.agg(
//统计平均分
round(avg($"rating"), 2).as("avg_rate"),
//统计评分次数
count($"itemId").as("cnt")
)
//过滤:评分次数大于2000
.where($"cnt".gt(2000))
//排序
.orderBy($"avg_rate".desc, $"cnt".desc)
.limit(10)
//step3:保存结果
rsData2.printSchema()
rsData2.show()
//保存到MySQL
rsData2
.write
.mode(SaveMode.Overwrite)
.option("user","root")
.option("password","123456")
.jdbc("jdbc:mysql://localhost:3306/test?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true","test.tb_top10_movies",new Properties())
// Thread.sleep(1000000000000000L)
spark.stop()
}
}
其中使用SparkSQL中自带函数库functions,org.apache.spark.sql.functions中,包含常用函数,有些与Hive中函数库类似,但是名称不一样。
使用需要导入函数库: import org.apache.spark.sql.functions._
将分析结果数据保存到外部存储系统中,比如保存到MySQL数据库表中或者CSV文件中。
// TODO: 将分析的结果数据保存MySQL数据库和CSV文件
// 结果DataFrame被使用多次,缓存
resultDF.persist(StorageLevel.MEMORY_AND_DISK)
// 1. 保存MySQL数据库表汇总
resultDF
.coalesce(1) // 考虑降低分区数目
.write
.mode("overwrite")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("user", "root")
.option("password", "123456")
.jdbc("jdbc:mysql://localhost:3306/serverTimezone=UTC&characterEncoding=utf8&useUnicode=true","db_test.tb_top10_movies",new Properties())
// 2. 保存CSV文件:每行数据中个字段之间使用逗号隔开
resultDF
.coalesce(1)
.write.mode("overwrite")
.csv("datas/top10-movies")
// 释放缓存数据
resultDF.unpersist()
查看数据库中结果表的数据:
上面的SQL和DSL已经是完整代码了,再来一份整合在一起的完整代码。
电影评分数据分析,经过数据ETL、数据分析(SQL分析和DSL分析)及最终保存结果,整套数据处理分析流程,其中涉及到很多数据细节,完整代码如下:
import java.util.Properties
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.storage.StorageLevel
/**
* 需求:对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)
*/
object SparkTop10Movie {
def main(args: Array[String]): Unit = {
// 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.master("local[4]")
.appName(this.getClass.getSimpleName.stripSuffix("$"))
// TODO: 设置shuffle时分区数目
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
// 导入隐式转换
import spark.implicits._
// 1. 读取电影评分数据,从本地文件系统读取
val rawRatingsDS: Dataset[String] = spark.read.textFile("datas/ml-1m/ratings.dat")
// 2. 转换数据
val ratingsDF: DataFrame = rawRatingsDS
// 过滤数据
.filter(line => null != line && line.trim.split("::").length == 4)
// 提取转换数据
.mapPartitions { iter =>
iter.map { line =>
// 按照分割符分割,拆箱到变量中
val Array(userId, movieId, rating, timestamp) = line.trim.split("::")
// 返回四元组
(userId, movieId, rating.toDouble, timestamp.toLong)
}
}
// 指定列名添加Schema
.toDF("userId", "movieId", "rating", "timestamp")
/*
root
|-- userId: string (nullable = true)
|-- movieId: string (nullable = true)
|-- rating: double (nullable = false)
|-- timestamp: long (nullable = false)
*/
//ratingsDF.printSchema()
/*
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
| 1| 1193| 5.0|978300760|
| 1| 661| 3.0|978302109|
| 1| 594| 4.0|978302268|
| 1| 919| 4.0|978301368|
+------+-------+------+---------+
*/
//ratingsDF.show(4)
// TODO: 基于SQL方式分析
// 第一步、注册DataFrame为临时视图
ratingsDF.createOrReplaceTempView("view_temp_ratings")
// 第二步、编写SQL
val top10MovieDF: DataFrame = spark.sql(
"""
|SELECT
| movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating
|FROM
| view_temp_ratings
|GROUP BY
| movieId
|HAVING
| cnt_rating > 2000
|ORDER BY
| avg_rating DESC, cnt_rating DESC
|LIMIT
| 10
""".stripMargin)
//top10MovieDF.printSchema()
top10MovieDF.show(10, truncate = false)
println("===============================================================")
// TODO: 基于DSL=Domain Special Language(特定领域语言) 分析
import org.apache.spark.sql.functions._
val resultDF: DataFrame = ratingsDF
// 选取字段
.select($"movieId", $"rating")
// 分组:按照电影ID,获取平均评分和评分次数
.groupBy($"movieId")
.agg( //
round(avg($"rating"), 2).as("avg_rating"), //
count($"movieId").as("cnt_rating") //
)
// 过滤:评分次数大于2000
.filter($"cnt_rating" > 2000)
// 排序:先按照评分降序,再按照次数降序
.orderBy($"avg_rating".desc, $"cnt_rating".desc)
// 获取前10
.limit(10)
//resultDF.printSchema()
resultDF.show(10)
// TODO: 将分析的结果数据保存MySQL数据库和CSV文件
// 结果DataFrame被使用多次,缓存
resultDF.persist(StorageLevel.MEMORY_AND_DISK)
// 1. 保存MySQL数据库表汇总
resultDF
.coalesce(1) // 考虑降低分区数目
.write
.mode("overwrite")
.option("driver", "com.mysql.cj.jdbc.Driver")
.option("user", "root")
.option("password", "123456")
.jdbc(
"jdbc:mysql://localhost:3306/test?serverTimezone=UTC&characterEncoding=utf8&useUnicode=true",
"test.tb_top10_movies",
new Properties()
)
// 2. 保存CSV文件:每行数据中个字段之间使用逗号隔开
resultDF
.coalesce(1)
.write.mode("overwrite")
.csv("datas/top10-movies")
// 释放缓存数据
resultDF.unpersist()
// 应用结束,关闭资源
Thread.sleep(10000000)
spark.stop()
}
}
运行上述程序时,查看WEB UI监控页面发现, 某个Stage中有200个Task任务,也就是说RDD有200分区Partition。
原因: 在SparkSQL中当Job中产生Shuffle时,默认的分区数(spark.sql.shuffle.partitions )为200,在实际项目中要合理的设置。 在构建SparkSession实例对象时,设置参数的值:
// 构建SparkSession实例对象
val spark: SparkSession = SparkSession.builder()
.master("local[4]")
.appName(this.getClass.getSimpleName.stripSuffix("$"))
// TODO: 设置shuffle时分区数目
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
// 导入隐式转换
import spark.implicits._
下篇文章我们来看下DataSet的分析,以及RDD、DF与DS的转换。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。