赞
踩
一、实验目的
(1)通过实验掌握 Spark SQL 的基本编程方法;
(2)熟悉 RDD 到 DataFrame 的转化方法;
(3)熟悉利用 Spark SQL 管理来自不同数据源的数据。
二、实验平台 操作系统: Ubuntu16.04 Spark 版本:2.1.0 数据库:MySQL
三、实验内容和要求
1.Spark SQL 基本操作 将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。 { "id":1 , "name":" Ella" , "age":36 } { "id":2, "name":"Bob","age":29 } { "id":3 , "name":"Jack","age":29 } { "id":4 , "name":"Jim","age":28 } { "id":4 , "name":"Jim","age":28 } { "id":5 , "name":"Damon" } { "id":5 , "name":"Damon" } 为 employee.json 创建 DataFrame,并写出 Scala 语句完成下列操作:
(1) 查询所有数据;
(2) 查询所有数据,并去除重复的数据;
(3) 查询所有数据,打印时去除 id 字段;
(4) 筛选出 age>30 的记录;
(5) 将数据按 age 分组;
(6) 将数据按 name 升序排列;
(7) 取出前 3 行数据;
(8) 查询所有记录的 name 列,并为其取别名为 username;
(9) 查询年龄 age 的平均值;
(10) 查询年龄 age 第 2 页 的最小值。
1.再MobaXter里
cd`/home/hadoop
创建
vim employee.json`
把数据粘上去
2.再IDEA上
创建
case object SparkSQLjibencaozuo
里面代码
package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ case object SparkSQLjibencaozuo{ def main(args: Array[String]): Unit = { val spark = SparkSession.builder.appName("EmployeeData").getOrCreate() import spark.implicits._ // 加载 JSON 数据到 DataFrame //val employeeDF = spark.read.json("employee.json") val employeeDF = spark.read.json("file:///home/hadoop/employee.json") // (1) 查询所有数据 employeeDF.show() // (2) 查询所有数据,并去除重复的数据 val distinctEmployeeDF = employeeDF.distinct() distinctEmployeeDF.show() // (3) 查询所有数据,打印时去除 id 字段 employeeDF.select("name", "age").show() // (4) 筛选出 age>30 的记录 employeeDF.filter($"age" > 30).show() // (5) 将数据按 age 分组 employeeDF.groupBy("age").count().show() // (6) 将数据按 name 升序排列 employeeDF.orderBy($"name".asc).show() // (7) 取出前 3 行数据 employeeDF.limit(3).show() // (8) 查询所有记录的 name 列,并为其取别名为 username employeeDF.select($"name".as("username")).show() // (9) 查询年龄 age 的平均值 employeeDF.select(avg($"age")).show() // (10) 查询年龄 age 的最小值 employeeDF.select(min($"age")).show() // 停止 Spark 会话 spark.stop() } }
- package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
- import org.apache.spark.sql.SparkSession
- import org.apache.spark.sql.functions._
- case object SparkSQLjibencaozuo{
- def main(args: Array[String]): Unit = {
- val spark = SparkSession.builder.appName("EmployeeData").getOrCreate()
- import spark.implicits._
-
- // 加载 JSON 数据到 DataFrame
- //val employeeDF = spark.read.json("employee.json")
-
- val employeeDF = spark.read.json("file:///home/hadoop/employee.json")
-
- // (1) 查询所有数据
- employeeDF.show()
-
- // (2) 查询所有数据,并去除重复的数据
- val distinctEmployeeDF = employeeDF.distinct()
- distinctEmployeeDF.show()
-
- // (3) 查询所有数据,打印时去除 id 字段
- employeeDF.select("name", "age").show()
-
- // (4) 筛选出 age>30 的记录
- employeeDF.filter($"age" > 30).show()
-
- // (5) 将数据按 age 分组
- employeeDF.groupBy("age").count().show()
-
- // (6) 将数据按 name 升序排列
- employeeDF.orderBy($"name".asc).show()
-
- // (7) 取出前 3 行数据
- employeeDF.limit(3).show()
-
- // (8) 查询所有记录的 name 列,并为其取别名为 username
- employeeDF.select($"name".as("username")).show()
-
- // (9) 查询年龄 age 的平均值
- employeeDF.select(avg($"age")).show()
-
- // (10) 查询年龄 age 的最小值
- employeeDF.select(min($"age")).show()
-
- // 停止 Spark 会话
- spark.stop()
-
- }
- }
注意一下
```
// 加载 JSON 数据到 DataFrame
//val employeeDF = spark.read.json("employee.json")
val employeeDF = spark.read.json("file:///home/hadoop/employee.json")
```
记好employee.json文件在哪个目录下
1.运行代码
有这个结果就行了,不需要在意红色
Process finished with exit code 1
2.打包项目 build->package->git add jar->commit->push,在虚拟机中 git pull origin master 上传的 jar 包
出现这个结果就正确的
3.以 spark-local 模式提交 spark 任务
先 cd /opt/module/spark-local/
再运行
bin/spark-submit --master local[*] --jars /opt/module/spark-local/jars/mysql-connector-java-5.1.27-bin.jar --class IDEA自己文件的地址 ~/gitdata/target/scala_demo-1.0-SNAPSHOT.jar
4.10结果就出来了
2.编程实现将 RDD 转换为 DataFrame 源文件内容如下(包含 id,name,age):
1,Ella,36 2,Bob,29 3,Jack,29
请将数据复制保存到 Linux 系统中,命名为 employee.txt,实现从 RDD 转换得到 DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。请写出程序代 码。
employee.txt 如上面一样
再IDEA上建立RDDToDataFrameExample
package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame import org.apache.spark.sql.{SparkSession, Row} import org.apache.spark.sql.types._ case object RDDToDataFrameExample{ def main(args: Array[String]): Unit = { // 创建SparkSession val spark = SparkSession.builder() .appName("RDD to DataFrame Example") .master("local[*]") // 使用本地模式,如果连接到集群请更改这里 .getOrCreate() import spark.implicits._ // 指定employee.txt文件的位置 val inputFilePath = "file:///home/hadoop/employee.txt" // 从文本文件读取数据创建RDD val rdd = spark.sparkContext.textFile(inputFilePath) // 定义DataFrame的schema val schema = StructType(Array( StructField("id", IntegerType, nullable = false), StructField("name", StringType, nullable = false), StructField("age", IntegerType, nullable = false) )) // 将RDD转换为DataFrame val dataFrame = spark.createDataFrame(rdd.map { line => val parts = line.split(",") Row(parts(0).toInt, parts(1), parts(2).toInt) }, schema) // 显示DataFrame内容 dataFrame.show(false) // 按照指定格式打印所有数据 dataFrame.collect().foreach { row => println(s"id:${row.getAs[Int]("id")},name:${row.getAs[String]("name")},age:${row.getAs[Int]("age")}") } // 停止SparkSession spark.stop() } }
- package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
- import org.apache.spark.sql.{SparkSession, Row}
- import org.apache.spark.sql.types._
- case object RDDToDataFrameExample{
- def main(args: Array[String]): Unit = {
- // 创建SparkSession
- val spark = SparkSession.builder()
- .appName("RDD to DataFrame Example")
- .master("local[*]") // 使用本地模式,如果连接到集群请更改这里
- .getOrCreate()
-
- import spark.implicits._
-
- // 指定employee.txt文件的位置
- val inputFilePath = "file:///home/hadoop/employee.txt"
-
- // 从文本文件读取数据创建RDD
- val rdd = spark.sparkContext.textFile(inputFilePath)
-
- // 定义DataFrame的schema
- val schema = StructType(Array(
- StructField("id", IntegerType, nullable = false),
- StructField("name", StringType, nullable = false),
- StructField("age", IntegerType, nullable = false)
- ))
-
- // 将RDD转换为DataFrame
- val dataFrame = spark.createDataFrame(rdd.map { line =>
- val parts = line.split(",")
- Row(parts(0).toInt, parts(1), parts(2).toInt)
- }, schema)
-
- // 显示DataFrame内容
- dataFrame.show(false)
-
- // 按照指定格式打印所有数据
- dataFrame.collect().foreach { row =>
- println(s"id:${row.getAs[Int]("id")},name:${row.getAs[String]("name")},age:${row.getAs[Int]("age")}")
- }
-
- // 停止SparkSession
- spark.stop()
- }
-
-
-
- }
1.建表
CREATE DATABASE sparktest;
USE sparktest;
CREATE TABLE employee (
id INT PRIMARY KEY,
name VARCHAR(50),
gender CHAR(1),
age INT
);
INSERT INTO employee (id, name, gender, age) VALUES (1, 'Alice', 'F', 22);
INSERT INTO employee (id, name, gender, age) VALUES (2, 'John', 'M', 25);
————————————————
.
再IDEA上建立MySQLDataFrameExample
package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame import org.apache.spark.sql.{SparkSession, Row} import java.util.Properties import org.apache.spark.sql.SparkSession import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.functions.max import org.apache.spark.sql.functions.sum case object MySQLDataFrameExample{ def main(args: Array[String]): Unit = { // 创建SparkSession val spark = SparkSession.builder() .appName("MySQL DataFrame Example MySQL写入与读取") .master("local[*]") // 使用本地模式,如果连接到集群请更改这里 .getOrCreate() import spark.implicits._ // 配置MySQL JDBC连接 val jdbcProperties = new Properties() jdbcProperties.setProperty("user", "root") jdbcProperties.setProperty("password", "自己的密码") jdbcProperties.setProperty("driver", "com.mysql.jdbc.Driver") //jdbcProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver") // 定义MySQL的JDBC连接URL val jdbcUrl = "jdbc:mysql://自己的主机:3306/sparktest" // 创建DataFrame以插入数据 val newEmployeeData = Seq( (3, "Mary", "F", 26), (4, "Tom", "M", 23) ).toDF("id", "name", "gender", "age") // 将DataFrame数据插入到MySQL的employee表中 newEmployeeData.write .mode("append") // 使用append模式来添加数据,而不是覆盖 .jdbc(jdbcUrl, "employee", jdbcProperties) // 从MySQL读取employee表的数据 val employeeDF = spark.read .jdbc(jdbcUrl, "employee", jdbcProperties) // 打印age的最大值 val maxAge = employeeDF.agg(max("age")).collect()(0).getAs[Int](0) println(s"Max age: $maxAge") // 打印age的总和 val sumAge = employeeDF.agg(sum("age")).collect()(0).getAs[Long](0) println(s"Sum of ages: $sumAge") // 停止SparkSession spark.stop() } }
- package edu.hnuahe.wanqing.spark_6_3_CreateDataFrame
- import org.apache.spark.sql.{SparkSession, Row}
- import java.util.Properties
- import org.apache.spark.sql.SparkSession
- import org.apache.spark.sql.Dataset
- import org.apache.spark.sql.Row
- import org.apache.spark.sql.functions.max
- import org.apache.spark.sql.functions.sum
-
- case object MySQLDataFrameExample{
- def main(args: Array[String]): Unit = {
- // 创建SparkSession
- val spark = SparkSession.builder()
- .appName("MySQL DataFrame Example MySQL写入与读取")
- .master("local[*]") // 使用本地模式,如果连接到集群请更改这里
- .getOrCreate()
-
- import spark.implicits._
-
- // 配置MySQL JDBC连接
- val jdbcProperties = new Properties()
- jdbcProperties.setProperty("user", "root")
- jdbcProperties.setProperty("password", "自己的")
- jdbcProperties.setProperty("driver", "com.mysql.jdbc.Driver")
- //jdbcProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")
-
- // 定义MySQL的JDBC连接URL
- val jdbcUrl = "jdbc:mysql://自己的主机:3306/sparktest"
-
- // 创建DataFrame以插入数据
- val newEmployeeData = Seq(
- (3, "Mary", "F", 26),
- (4, "Tom", "M", 23)
- ).toDF("id", "name", "gender", "age")
-
- // 将DataFrame数据插入到MySQL的employee表中
- newEmployeeData.write
- .mode("append") // 使用append模式来添加数据,而不是覆盖
- .jdbc(jdbcUrl, "employee", jdbcProperties)
-
- // 从MySQL读取employee表的数据
- val employeeDF = spark.read
- .jdbc(jdbcUrl, "employee", jdbcProperties)
-
- // 打印age的最大值
- val maxAge = employeeDF.agg(max("age")).collect()(0).getAs[Int](0)
- println(s"Max age: $maxAge")
-
- // 打印age的总和
- val sumAge = employeeDF.agg(sum("age")).collect()(0).getAs[Long](0)
- println(s"Sum of ages: $sumAge")
-
- // 停止SparkSession
- spark.stop()
- }
- }
再如实验一,上创到虚拟机上查看结果
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。