赞
踩
Spark SQL编程初级实践
将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。
{ "id":1 , "name":" Ella" , "age":36 } { "id":2, "name":"Bob","age":29 } { "id":3 , "name":"Jack","age":29 } { "id":4 , "name":"Jim","age":28 } { "id":4 , "name":"Jim","age":28 } { "id":5 , "name":"Damon" } { "id":5 , "name":"Damon" } |
为employee.json创建DataFrame,并写出Scala语句完成下列操作:
源文件内容如下(包含id,name,age):
1,Ella,36 2,Bob,29 3,Jack,29 |
请将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。
(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表6-2所示的两行数据。
表6-2 employee表原有数据
id | name | gender | Age |
1 | Alice | F | 22 |
2 | John | M | 25 |
(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表6-3所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。
表6-3 employee表新增数据
id | name | gender | age |
3 | Mary | F | 26 |
4 | Tom | M | 23 |
实验一 :Spark SQL基本操作
- 1)
- // 导入必要的库
- import org.apache.spark.sql.SparkSession
-
- // 创建SparkSession
- val spark = SparkSession.builder()
- .appName("Spark SQL Basic Operations")
- .getOrCreate()
-
- // 读取JSON文件创建DataFrame
- val df = spark.read.json("file:///home/hadoop/employee.json")
- // (1) 查询所有数据
- df.show()
- (2)查询所有数据,并去除重复的数据
- df.distinct().show()
-
- (3)
- 查询所有数据,打印时去除id字段
- df.drop("id").show()
-
- (4)
- 筛选出age>30的记录
- df.filter("age > 30").show()
-
- (5)
- 将数据按age分组
- df.groupBy("age").count().show()
-
-
- (6)
- 将数据按name升序排列
- df.orderBy("name").show()
-
-
- (7)
- 取出前3行数据
- df.limit(3).show()
-
- (8)
- 查询所有记录的name列,并为其取别名为username
- df.select($"name".alias("username")).show()
-
- (9)
- 查询年龄age的平均值
- df.selectExpr("avg(age)").show()
-
- (10)
- 查询年龄age的最小值
- df.selectExpr("min(age)").show()
实验二 :编程实现将RDD转换为DataFrame
编程代码:
- import org.apache.spark.sql.{SparkSession, Row}
- import org.apache.spark.sql.types._
-
- object RDDToDataFrameExample {
- def main(args: Array[String]): Unit = {
- // 创建SparkSession
- val spark = SparkSession.builder()
- .appName("RDD to DataFrame Example")
- .master("local[*]") // 使用本地模式,如果连接到集群请更改这里
- .getOrCreate()
-
- import spark.implicits._
-
- // 指定employee.txt文件的位置
- val inputFilePath = "file:///home/hadoop/employee.txt"
-
- // 从文本文件读取数据创建RDD
- val rdd = spark.sparkContext.textFile(inputFilePath)
-
- // 定义DataFrame的schema
- val schema = StructType(Array(
- StructField("id", IntegerType, nullable = false),
- StructField("name", StringType, nullable = false),
- StructField("age", IntegerType, nullable = false)
- ))
-
- // 将RDD转换为DataFrame
- val dataFrame = spark.createDataFrame(rdd.map { line =>
- val parts = line.split(",")
- Row(parts(0).toInt, parts(1), parts(2).toInt)
- }, schema)
-
- // 显示DataFrame内容
- dataFrame.show(false)
-
- // 按照指定格式打印所有数据
- dataFrame.collect().foreach { row =>
- println(s"id:${row.getAs[Int]("id")},name:${row.getAs[String]("name")},age:${row.getAs[Int]("age")}")
- }
-
- // 停止SparkSession
- spark.stop()
- }
- }
命令
/usr/local/spark-3.5.1/bin/spark-submit --class "RDDToDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar
具体操作参考博客
如何安装sbt(sbt在ubuntu上的安装与配置)(有详细安装网站和图解)-CSDN博客
实验三:编程实现利用DataFrame读写MySQL的数据
mysql代码
- CREATE DATABASE sparktest;
- USE sparktest;
-
- CREATE TABLE employee (
- id INT PRIMARY KEY,
- name VARCHAR(50),
- gender CHAR(1),
- age INT
- );
-
- INSERT INTO employee (id, name, gender, age) VALUES (1, 'Alice', 'F', 22);
- INSERT INTO employee (id, name, gender, age) VALUES (2, 'John', 'M', 25);
如何安装msyql参考博客
在ubuntu上安装mysql(在线安装需要)-CSDN博客
编程代码
- import org.apache.spark.sql.{SparkSession, Row}
- import java.util.Properties
- import org.apache.spark.sql.SparkSession
- import org.apache.spark.sql.Dataset
- import org.apache.spark.sql.Row
- import org.apache.spark.sql.functions.max
- import org.apache.spark.sql.functions.sum
-
- object MySQLDataFrameExample {
- def main(args: Array[String]): Unit = {
- // 创建SparkSession
- val spark = SparkSession.builder()
- .appName("MySQL DataFrame Example")
- .master("local[*]") // 使用本地模式,如果连接到集群请更改这里
- .getOrCreate()
-
- import spark.implicits._
-
- // 配置MySQL JDBC连接
- val jdbcProperties = new Properties()
- jdbcProperties.setProperty("user", "root")
- jdbcProperties.setProperty("password", "mysql")
- jdbcProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")
-
- // 定义MySQL的JDBC连接URL
- val jdbcUrl = "jdbc:mysql://localhost:3306/sparktest"
-
- // 创建DataFrame以插入数据
- val newEmployeeData = Seq(
- (3, "Mary", "F", 26),
- (4, "Tom", "M", 23)
- ).toDF("id", "name", "gender", "age")
-
- // 将DataFrame数据插入到MySQL的employee表中
- newEmployeeData.write
- .mode("append") // 使用append模式来添加数据,而不是覆盖
- .jdbc(jdbcUrl, "employee", jdbcProperties)
-
- // 从MySQL读取employee表的数据
- val employeeDF = spark.read
- .jdbc(jdbcUrl, "employee", jdbcProperties)
-
- // 打印age的最大值
- val maxAge = employeeDF.agg(max("age")).collect()(0).getAs[Int](0)
- println(s"Max age: $maxAge")
-
- // 打印age的总和
- val sumAge = employeeDF.agg(sum("age")).collect()(0).getAs[Long](0)
- println(s"Sum of ages: $sumAge")
-
- // 停止SparkSession
- spark.stop()
- }
- }
编程详细步骤参考
如何安装sbt(sbt在ubuntu上的安装与配置)(有详细安装网站和图解)-CSDN博客
运行命令
/usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar
产生错误
主要问题都在实验三中,因为实验三中涉及到一个mysql数据库连接
命令更新为
/usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar
加了一个mysl驱动的jar的引用
如何安装mysql驱动参考博客
打包失败
这个问题是代码错误
代码未引入一些包
加上下面这些就可以了
import org.apache.spark.sql.{SparkSession, Row}
import java.util.Properties
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.functions.sum
运行失败
未引入mysl驱动程序
要下载mysql驱动
采用命令引入
/usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar
参考链接
如何安装sbt(sbt在ubuntu上的安装与配置)(有详细安装网站和图解)-CSDN博客
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。