当前位置:   article > 正文

《Spark 编程基础(Scala 版)》第 6 章 Spark SQL 实验 5 Spark SQL 编程初级实践 (超级详细版)

《Spark 编程基础(Scala 版)》第 6 章 Spark SQL 实验 5 Spark SQL 编程初级实践 (超级详细版)

一、实验目的

(1)通过实验掌握 Spark SQL 的基本编程方法;

(2)熟悉 RDD 到 DataFrame 的转化方法;

(3)熟悉利用 Spark SQL 管理来自不同数据源的数据。

二、实验平台 操作系统: Ubuntu16.04 Spark 版本:2.1.0 数据库:MySQL

三、实验内容和要求

1.Spark SQL 基本操作 将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。 { "id":1 , "name":" Ella" , "age":36 } { "id":2, "name":"Bob","age":29 } { "id":3 , "name":"Jack","age":29 } { "id":4 , "name":"Jim","age":28 } { "id":4 , "name":"Jim","age":28 } { "id":5 , "name":"Damon" } { "id":5 , "name":"Damon" } 为 employee.json 创建 DataFrame,并写出 Scala 语句完成下列操作:

(1) 查询所有数据;

 (2) 查询所有数据,并去除重复的数据;

(3) 查询所有数据,打印时去除 id 字段;

(4) 筛选出 age>30 的记录;

(5) 将数据按 age 分组;

(6) 将数据按 name 升序排列;

(7) 取出前 3 行数据;

(8) 查询所有记录的 name 列,并为其取别名为 username;

(9) 查询年龄 age 的平均值;

(10) 查询年龄 age 第 2 页 的最小值。

1.再MobaXter里

cd`/home/hadoop

创建

vim employee.json`

把数据粘上去

2.再IDEA上

创建

case object SparkSQLjibencaozuo

里面代码

package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
case object SparkSQLjibencaozuo{
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("EmployeeData").getOrCreate()
    import spark.implicits._

    // 加载 JSON 数据到 DataFrame
    //val employeeDF = spark.read.json("employee.json")

    val employeeDF = spark.read.json("file:///home/hadoop/employee.json")

    // (1) 查询所有数据
    employeeDF.show()

    // (2) 查询所有数据,并去除重复的数据
    val distinctEmployeeDF = employeeDF.distinct()
    distinctEmployeeDF.show()

    // (3) 查询所有数据,打印时去除 id 字段
    employeeDF.select("name", "age").show()

    // (4) 筛选出 age>30 的记录
    employeeDF.filter($"age" > 30).show()

    // (5) 将数据按 age 分组
    employeeDF.groupBy("age").count().show()

    // (6) 将数据按 name 升序排列
    employeeDF.orderBy($"name".asc).show()

    // (7) 取出前 3 行数据
    employeeDF.limit(3).show()

    // (8) 查询所有记录的 name 列,并为其取别名为 username
    employeeDF.select($"name".as("username")).show()

    // (9) 查询年龄 age 的平均值
    employeeDF.select(avg($"age")).show()

    // (10) 查询年龄 age 的最小值
    employeeDF.select(min($"age")).show()

    // 停止 Spark 会话
    spark.stop()

  }
}
  1. package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
  2. import org.apache.spark.sql.SparkSession
  3. import org.apache.spark.sql.functions._
  4. case object SparkSQLjibencaozuo{
  5. def main(args: Array[String]): Unit = {
  6. val spark = SparkSession.builder.appName("EmployeeData").getOrCreate()
  7. import spark.implicits._
  8. // 加载 JSON 数据到 DataFrame
  9. //val employeeDF = spark.read.json("employee.json")
  10. val employeeDF = spark.read.json("file:///home/hadoop/employee.json")
  11. // (1) 查询所有数据
  12. employeeDF.show()
  13. // (2) 查询所有数据,并去除重复的数据
  14. val distinctEmployeeDF = employeeDF.distinct()
  15. distinctEmployeeDF.show()
  16. // (3) 查询所有数据,打印时去除 id 字段
  17. employeeDF.select("name", "age").show()
  18. // (4) 筛选出 age>30 的记录
  19. employeeDF.filter($"age" > 30).show()
  20. // (5) 将数据按 age 分组
  21. employeeDF.groupBy("age").count().show()
  22. // (6) 将数据按 name 升序排列
  23. employeeDF.orderBy($"name".asc).show()
  24. // (7) 取出前 3 行数据
  25. employeeDF.limit(3).show()
  26. // (8) 查询所有记录的 name 列,并为其取别名为 username
  27. employeeDF.select($"name".as("username")).show()
  28. // (9) 查询年龄 age 的平均值
  29. employeeDF.select(avg($"age")).show()
  30. // (10) 查询年龄 age 的最小值
  31. employeeDF.select(min($"age")).show()
  32. // 停止 Spark 会话
  33. spark.stop()
  34. }
  35. }

注意一下

```
    // 加载 JSON 数据到 DataFrame
    //val employeeDF = spark.read.json("employee.json")

    val employeeDF = spark.read.json("file:///home/hadoop/employee.json")
```

记好employee.json文件在哪个目录下

1.运行代码

有这个结果就行了,不需要在意红色
Process finished with exit code 1

2.打包项目 build->package->git add jar->commit->push,在虚拟机中 git pull origin master 上传的 jar 包

出现这个结果就正确的

3.以 spark-local 模式提交 spark 任务
先  cd /opt/module/spark-local/
再运行
bin/spark-submit --master local[*] --jars /opt/module/spark-local/jars/mysql-connector-java-5.1.27-bin.jar --class IDEA自己文件的地址 ~/gitdata/target/scala_demo-1.0-SNAPSHOT.jar

4.10结果就出来了

2.编程实现将 RDD 转换为 DataFrame 源文件内容如下(包含 id,name,age):

1,Ella,36 2,Bob,29 3,Jack,29

请将数据复制保存到 Linux 系统中,命名为 employee.txt,实现从 RDD 转换得到 DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。请写出程序代 码。

employee.txt 如上面一样

再IDEA上建立RDDToDataFrameExample

package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.sql.types._
case object RDDToDataFrameExample{
  def main(args: Array[String]): Unit = {
    // 创建SparkSession
    val spark = SparkSession.builder()
      .appName("RDD to DataFrame Example")
      .master("local[*]") // 使用本地模式,如果连接到集群请更改这里
      .getOrCreate()

    import spark.implicits._

    // 指定employee.txt文件的位置
    val inputFilePath = "file:///home/hadoop/employee.txt"

    // 从文本文件读取数据创建RDD
    val rdd = spark.sparkContext.textFile(inputFilePath)

    // 定义DataFrame的schema
    val schema = StructType(Array(
      StructField("id", IntegerType, nullable = false),
      StructField("name", StringType, nullable = false),
      StructField("age", IntegerType, nullable = false)
    ))

    // 将RDD转换为DataFrame
    val dataFrame = spark.createDataFrame(rdd.map { line =>
      val parts = line.split(",")
      Row(parts(0).toInt, parts(1), parts(2).toInt)
    }, schema)

    // 显示DataFrame内容
    dataFrame.show(false)

    // 按照指定格式打印所有数据
    dataFrame.collect().foreach { row =>
      println(s"id:${row.getAs[Int]("id")},name:${row.getAs[String]("name")},age:${row.getAs[Int]("age")}")
    }

    // 停止SparkSession
    spark.stop()
  }

}
  1. package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
  2. import org.apache.spark.sql.{SparkSession, Row}
  3. import org.apache.spark.sql.types._
  4. case object RDDToDataFrameExample{
  5. def main(args: Array[String]): Unit = {
  6. // 创建SparkSession
  7. val spark = SparkSession.builder()
  8. .appName("RDD to DataFrame Example")
  9. .master("local[*]") // 使用本地模式,如果连接到集群请更改这里
  10. .getOrCreate()
  11. import spark.implicits._
  12. // 指定employee.txt文件的位置
  13. val inputFilePath = "file:///home/hadoop/employee.txt"
  14. // 从文本文件读取数据创建RDD
  15. val rdd = spark.sparkContext.textFile(inputFilePath)
  16. // 定义DataFrame的schema
  17. val schema = StructType(Array(
  18. StructField("id", IntegerType, nullable = false),
  19. StructField("name", StringType, nullable = false),
  20. StructField("age", IntegerType, nullable = false)
  21. ))
  22. // 将RDD转换为DataFrame
  23. val dataFrame = spark.createDataFrame(rdd.map { line =>
  24. val parts = line.split(",")
  25. Row(parts(0).toInt, parts(1), parts(2).toInt)
  26. }, schema)
  27. // 显示DataFrame内容
  28. dataFrame.show(false)
  29. // 按照指定格式打印所有数据
  30. dataFrame.collect().foreach { row =>
  31. println(s"id:${row.getAs[Int]("id")},name:${row.getAs[String]("name")},age:${row.getAs[Int]("age")}")
  32. }
  33. // 停止SparkSession
  34. spark.stop()
  35. }
  36. }

1.建表

CREATE DATABASE sparktest;  
USE sparktest;  
  
CREATE TABLE employee (  
  id INT PRIMARY KEY,  
  name VARCHAR(50),  
  gender CHAR(1),  
  age INT  
);  
  
INSERT INTO employee (id, name, gender, age) VALUES (1, 'Alice', 'F', 22);  
INSERT INTO employee (id, name, gender, age) VALUES (2, 'John', 'M', 25);
————————————————

.

再IDEA上建立MySQLDataFrameExample

package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
import org.apache.spark.sql.{SparkSession, Row}
import java.util.Properties
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.functions.sum

case object MySQLDataFrameExample{
  def main(args: Array[String]): Unit = {
    // 创建SparkSession
    val spark = SparkSession.builder()
      .appName("MySQL DataFrame Example MySQL写入与读取")
      .master("local[*]") // 使用本地模式,如果连接到集群请更改这里
      .getOrCreate()

    import spark.implicits._

    // 配置MySQL JDBC连接
    val jdbcProperties = new Properties()
    jdbcProperties.setProperty("user", "root")
    jdbcProperties.setProperty("password", "自己的密码")
    jdbcProperties.setProperty("driver", "com.mysql.jdbc.Driver")
    //jdbcProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")

    // 定义MySQL的JDBC连接URL
    val jdbcUrl = "jdbc:mysql://自己的主机:3306/sparktest"

    // 创建DataFrame以插入数据
    val newEmployeeData = Seq(
      (3, "Mary", "F", 26),
      (4, "Tom", "M", 23)
    ).toDF("id", "name", "gender", "age")

    // 将DataFrame数据插入到MySQL的employee表中
    newEmployeeData.write
      .mode("append") // 使用append模式来添加数据,而不是覆盖
      .jdbc(jdbcUrl, "employee", jdbcProperties)

    // 从MySQL读取employee表的数据
    val employeeDF = spark.read
      .jdbc(jdbcUrl, "employee", jdbcProperties)

    // 打印age的最大值
    val maxAge = employeeDF.agg(max("age")).collect()(0).getAs[Int](0)
    println(s"Max age: $maxAge")

    // 打印age的总和
    val sumAge = employeeDF.agg(sum("age")).collect()(0).getAs[Long](0)
    println(s"Sum of ages: $sumAge")

    // 停止SparkSession
    spark.stop()
  }
}
  1. package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
  2. import org.apache.spark.sql.{SparkSession, Row}
  3. import java.util.Properties
  4. import org.apache.spark.sql.SparkSession
  5. import org.apache.spark.sql.Dataset
  6. import org.apache.spark.sql.Row
  7. import org.apache.spark.sql.functions.max
  8. import org.apache.spark.sql.functions.sum
  9. case object MySQLDataFrameExample{
  10. def main(args: Array[String]): Unit = {
  11. // 创建SparkSession
  12. val spark = SparkSession.builder()
  13. .appName("MySQL DataFrame Example MySQL写入与读取")
  14. .master("local[*]") // 使用本地模式,如果连接到集群请更改这里
  15. .getOrCreate()
  16. import spark.implicits._
  17. // 配置MySQL JDBC连接
  18. val jdbcProperties = new Properties()
  19. jdbcProperties.setProperty("user", "root")
  20. jdbcProperties.setProperty("password", "自己的")
  21. jdbcProperties.setProperty("driver", "com.mysql.jdbc.Driver")
  22. //jdbcProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")
  23. // 定义MySQL的JDBC连接URL
  24. val jdbcUrl = "jdbc:mysql://自己的主机:3306/sparktest"
  25. // 创建DataFrame以插入数据
  26. val newEmployeeData = Seq(
  27. (3, "Mary", "F", 26),
  28. (4, "Tom", "M", 23)
  29. ).toDF("id", "name", "gender", "age")
  30. // 将DataFrame数据插入到MySQL的employee表中
  31. newEmployeeData.write
  32. .mode("append") // 使用append模式来添加数据,而不是覆盖
  33. .jdbc(jdbcUrl, "employee", jdbcProperties)
  34. // 从MySQL读取employee表的数据
  35. val employeeDF = spark.read
  36. .jdbc(jdbcUrl, "employee", jdbcProperties)
  37. // 打印age的最大值
  38. val maxAge = employeeDF.agg(max("age")).collect()(0).getAs[Int](0)
  39. println(s"Max age: $maxAge")
  40. // 打印age的总和
  41. val sumAge = employeeDF.agg(sum("age")).collect()(0).getAs[Long](0)
  42. println(s"Sum of ages: $sumAge")
  43. // 停止SparkSession
  44. spark.stop()
  45. }
  46. }

再如实验一,上创到虚拟机上查看结果

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/609391
推荐阅读
相关标签
  

闽ICP备14008679号