当前位置:   article > 正文

实验五 Spark SQL编程初级实践

实验五 Spark SQL编程初级实践

Spark SQL编程初级实践

  • Spark SQL基本操作

将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{ "id":1 , "name":" Ella" , "age":36 }

{ "id":2, "name":"Bob","age":29 }

{ "id":3 , "name":"Jack","age":29 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":4 , "name":"Jim","age":28 }

{ "id":5 , "name":"Damon" }

{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Scala语句完成下列操作:

  1. 查询所有数据;
  2. 查询所有数据,并去除重复的数据;
  3. 查询所有数据,打印时去除id字段;
  4. 筛选出age>30的记录;
  5. 将数据按age分组;
  6. 将数据按name升序排列;
  7. 取出前3行数据;
  8. 查询所有记录的name列,并为其取别名为username;
  9. 查询年龄age的平均值;
  10. 查询年龄age的最小值。

  • 编程实现将RDD转换为DataFrame

源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29

将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

  • 编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表6-2所示的两行数据。

表6-2 employee表原有数据

id

name

gender

Age

1

Alice

F

22

2

John

M

25

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表6-3所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。

表6-3 employee表新增数据

id

name

gender

age

3

Mary

F

26

4

Tom

M

23

实验一 :Spark SQL基本操作

  1. 1
  2. // 导入必要的库
  3. import org.apache.spark.sql.SparkSession
  4. // 创建SparkSession
  5. val spark = SparkSession.builder()
  6. .appName("Spark SQL Basic Operations")
  7. .getOrCreate()
  8. // 读取JSON文件创建DataFrame
  9. val df = spark.read.json("file:///home/hadoop/employee.json")
  10. // (1) 查询所有数据
  11. df.show()
  12. 2)查询所有数据,并去除重复的数据
  13. df.distinct().show()
  14. 3
  15. 查询所有数据,打印时去除id字段
  16. df.drop("id").show()
  17. 4
  18. 筛选出age>30的记录
  19. df.filter("age > 30").show()
  20. 5
  21. 将数据按age分组
  22. df.groupBy("age").count().show()
  23. 6
  24. 将数据按name升序排列
  25. df.orderBy("name").show()
  26. 7
  27. 取出前3行数据
  28. df.limit(3).show()
  29. 8
  30. 查询所有记录的name列,并为其取别名为username
  31. df.select($"name".alias("username")).show()
  32. 9
  33. 查询年龄age的平均值
  34. df.selectExpr("avg(age)").show()
  35. 10
  36. 查询年龄age的最小值
  37. df.selectExpr("min(age)").show()

实验二 :编程实现将RDD转换为DataFrame

编程代码:

  1. import org.apache.spark.sql.{SparkSession, Row}
  2. import org.apache.spark.sql.types._
  3. object RDDToDataFrameExample {
  4. def main(args: Array[String]): Unit = {
  5. // 创建SparkSession
  6. val spark = SparkSession.builder()
  7. .appName("RDD to DataFrame Example")
  8. .master("local[*]") // 使用本地模式,如果连接到集群请更改这里
  9. .getOrCreate()
  10. import spark.implicits._
  11. // 指定employee.txt文件的位置
  12. val inputFilePath = "file:///home/hadoop/employee.txt"
  13. // 从文本文件读取数据创建RDD
  14. val rdd = spark.sparkContext.textFile(inputFilePath)
  15. // 定义DataFrame的schema
  16. val schema = StructType(Array(
  17. StructField("id", IntegerType, nullable = false),
  18. StructField("name", StringType, nullable = false),
  19. StructField("age", IntegerType, nullable = false)
  20. ))
  21. // 将RDD转换为DataFrame
  22. val dataFrame = spark.createDataFrame(rdd.map { line =>
  23. val parts = line.split(",")
  24. Row(parts(0).toInt, parts(1), parts(2).toInt)
  25. }, schema)
  26. // 显示DataFrame内容
  27. dataFrame.show(false)
  28. // 按照指定格式打印所有数据
  29. dataFrame.collect().foreach { row =>
  30. println(s"id:${row.getAs[Int]("id")},name:${row.getAs[String]("name")},age:${row.getAs[Int]("age")}")
  31. }
  32. // 停止SparkSession
  33. spark.stop()
  34. }
  35. }

 命令

/usr/local/spark-3.5.1/bin/spark-submit --class "RDDToDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar

 具体操作参考博客

如何安装sbt(sbt在ubuntu上的安装与配置)(有详细安装网站和图解)-CSDN博客

实验三:编程实现利用DataFrame读写MySQL的数据

mysql代码

  1. CREATE DATABASE sparktest;
  2. USE sparktest;
  3. CREATE TABLE employee (
  4. id INT PRIMARY KEY,
  5. name VARCHAR(50),
  6. gender CHAR(1),
  7. age INT
  8. );
  9. INSERT INTO employee (id, name, gender, age) VALUES (1, 'Alice', 'F', 22);
  10. INSERT INTO employee (id, name, gender, age) VALUES (2, 'John', 'M', 25);

如何安装msyql参考博客

 在ubuntu上安装mysql(在线安装需要)-CSDN博客

如何安装mysl驱动程序jar包-CSDN博客

编程代码

  1. import org.apache.spark.sql.{SparkSession, Row}
  2. import java.util.Properties
  3. import org.apache.spark.sql.SparkSession
  4. import org.apache.spark.sql.Dataset
  5. import org.apache.spark.sql.Row
  6. import org.apache.spark.sql.functions.max
  7. import org.apache.spark.sql.functions.sum
  8. object MySQLDataFrameExample {
  9. def main(args: Array[String]): Unit = {
  10. // 创建SparkSession
  11. val spark = SparkSession.builder()
  12. .appName("MySQL DataFrame Example")
  13. .master("local[*]") // 使用本地模式,如果连接到集群请更改这里
  14. .getOrCreate()
  15. import spark.implicits._
  16. // 配置MySQL JDBC连接
  17. val jdbcProperties = new Properties()
  18. jdbcProperties.setProperty("user", "root")
  19. jdbcProperties.setProperty("password", "mysql")
  20. jdbcProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")
  21. // 定义MySQL的JDBC连接URL
  22. val jdbcUrl = "jdbc:mysql://localhost:3306/sparktest"
  23. // 创建DataFrame以插入数据
  24. val newEmployeeData = Seq(
  25. (3, "Mary", "F", 26),
  26. (4, "Tom", "M", 23)
  27. ).toDF("id", "name", "gender", "age")
  28. // 将DataFrame数据插入到MySQL的employee表中
  29. newEmployeeData.write
  30. .mode("append") // 使用append模式来添加数据,而不是覆盖
  31. .jdbc(jdbcUrl, "employee", jdbcProperties)
  32. // 从MySQL读取employee表的数据
  33. val employeeDF = spark.read
  34. .jdbc(jdbcUrl, "employee", jdbcProperties)
  35. // 打印age的最大值
  36. val maxAge = employeeDF.agg(max("age")).collect()(0).getAs[Int](0)
  37. println(s"Max age: $maxAge")
  38. // 打印age的总和
  39. val sumAge = employeeDF.agg(sum("age")).collect()(0).getAs[Long](0)
  40. println(s"Sum of ages: $sumAge")
  41. // 停止SparkSession
  42. spark.stop()
  43. }
  44. }

编程详细步骤参考

 如何安装sbt(sbt在ubuntu上的安装与配置)(有详细安装网站和图解)-CSDN博客

 运行命令

/usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar  --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar

产生错误

主要问题都在实验三中,因为实验三中涉及到一个mysql数据库连接

命令更新为

/usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar  --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar

加了一个mysl驱动的jar的引用

如何安装mysql驱动参考博客

如何安装mysl驱动程序jar包-CSDN博客

打包失败

这个问题是代码错误

代码未引入一些包

加上下面这些就可以了

import org.apache.spark.sql.{SparkSession, Row}  

import java.util.Properties  

import org.apache.spark.sql.SparkSession  

import org.apache.spark.sql.Dataset  

import org.apache.spark.sql.Row  

import org.apache.spark.sql.functions.max  

import org.apache.spark.sql.functions.sum  

运行失败

未引入mysl驱动程序

要下载mysql驱动

采用命令引入

/usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar  --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar

参考链接

如何安装sbt(sbt在ubuntu上的安装与配置)(有详细安装网站和图解)-CSDN博客

在ubuntu上安装mysql(在线安装需要)-CSDN博客

在ubuntu上安装mysql(在线安装需要)-CSDN博客

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/507080
推荐阅读
相关标签
  

闽ICP备14008679号