当前位置:   article > 正文

Spark大数据分析与实战:Spark SQL编程初级实践_spark sql编程初级实战实验报告

spark sql编程初级实战实验报告

Spark大数据分析与实战:Spark SQL编程初级实践

一、安装Hadoop和Spark

具体的安装过程在我以前的博客里面有,大家可以通过以下链接进入操作:

Hadoop的安装:https://blog.csdn.net/weixin_47580081/article/details/108647420
Scala及Spark的安装:https://blog.csdn.net/weixin_47580081/article/details/114250894

提示:如果IDEA未构建Spark项目,可以转接到以下的博客:

IDEA使用Maven构建Spark项目:https://blog.csdn.net/weixin_47580081/article/details/115435536

二、启动Hadoop与Spark

查看3个节点的进程

master在这里插入图片描述
slave1
在这里插入图片描述
slave2
在这里插入图片描述

三、Spark SQL基本操作

将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

{“id”:1,“name”:“Ella”,“age”:36}
{“id”:2,“name”:“Bob”,“age”:29}
{“id”:3,“name”:“Jack”,“age”:29}
{“id”:4,“name”:“Jim”,“age”:28}
{“id”:4,“name”:“Jim”,“age”:28}
{“id”:5,“name”:“Damon”}
{“id”:5,“name”:“Damon”}
在这里插入图片描述

(1) 查询所有数据;

源代码:

val df = spark.read.json("file:///opt/data/employee.json")

df.show()
  • 1
  • 2
  • 3

运行截图:
在这里插入图片描述

(2) 查询所有数据,并去除重复的数据;

源代码:

df.distinct().show()
  • 1

运行截图:
在这里插入图片描述

(3) 查询所有数据,打印时去除id字段;

源代码:

df.drop("id").show()
  • 1

运行截图:
在这里插入图片描述

(4) 筛选出age>30的记录;

源代码:

df.filter(df("age")>30).show()
  • 1

运行截图:
在这里插入图片描述
(5) 将数据按age分组;

源代码:

df.groupBy("age").count().show()
  • 1

运行截图:
在这里插入图片描述

(6) 将数据按name升序排列;

源代码:

df.sort(df("name")).show()
  • 1

运行截图:在这里插入图片描述

(7) 取出前3行数据;

源代码:

df.head(3)
  • 1

运行截图:
在这里插入图片描述

(8) 查询所有记录的name列,并为其取别名为username;

源代码:

df.select(df("name").alias("username")).show()
  • 1

运行截图:

在这里插入图片描述
(9) 查询年龄age的平均值;

源代码:

df.agg("age"->"avg").show()
  • 1

运行截图:
在这里插入图片描述
(10) 查询年龄age的最小值。

源代码:

df.agg("age"->"min").show()
  • 1

运行截图:
在这里插入图片描述

四、编程实现将RDD转换为DataFrame

源文件内容如下(包含id,name,age):

1,Ella,36

2,Bob,29

3,Jack,29
在这里插入图片描述

源代码:

package com.John.Sparkstudy.sqlTest.Test02

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession

/**
 \* @author John
 \* @Date 2021/4/15 17:44
 */

object project2 {

 case class Employee(id:Long,name:String,age:Long)

 def main(args: Array[String]) {

  // 配置conf连接
  val conf = new SparkConf()
  conf.setMaster("local[3]")
   .setAppName("RDDtoDF")
  val sc = new SparkContext(conf)
      
  // 创建 sparkSession
  val spark = SparkSession.builder.getOrCreate

  // 导入txt文件并转换为dataframe
  import spark.implicits._
  val employeeDF = sc.textFile("D:\\bigdata\\Spark分布式计算框架\\data\\employee.txt")
   .map(_.split(","))
   .map(attributes => Employee(attributes(0).trim.toInt, attributes(1),attributes(2).trim.toInt)).toDF()

  // 创建临时视图,并用sparksql命令查询
  employeeDF.createOrReplaceTempView("employee")
  var employeeRDD = spark.sql("select id,name,age from employee")

  // 设置输出格式
  employeeRDD.map(t=>"id:"+t(0)+"name:"+t(1)+"age:"+t(2)).show()

 }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

运行截图:

在这里插入图片描述

五、编程实现利用DataFrame读写MySQL的数据

(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表6-2所示的两行数据。

表6-2 employee表 原有数据
idnamegenderAge
1AliceF22
2JohnM25

源代码:

create database sparktest;

use sparktest;

create table employee(id int(4),name char(50), gender char(20), age int(10));

insert into employee values(1,'Alice','F',22);

insert into employee values(2,'John','M',25);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

运行截图:在这里插入图片描述

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表6-3所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。

表6-3 employee表 新增数据
idnamegenderage
3MaryF26
4TomM23

源代码:

package com.John.Sparkstudy.sqlTest.Test02

import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

/**
 \* @author John
 \* @Date 2021/4/15 17:25
 */

object project3 {

 def main(args: Array[String]): Unit = {
 
  // 创建 sparkSession
  val spark:SparkSession = SparkSession.builder()
   .appName("mysql_spark")
   .master("local[3]")
   .getOrCreate()

  // 创建需要插入数据
  val employeeRDD=spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))
  val schema=StructType(List(StructField("id",IntegerType,true)
   ,StructField("name",StringType,true)
   ,StructField("gender",StringType,true)
   , StructField("age",IntegerType,true)))
  val rowRDD=employeeRDD.map(p=>Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))
  val employeeDF=spark.createDataFrame(rowRDD,schema)

  // 创建Properties对象,设置连接mysql的用户名和密码,并插入数据
  val prop=new Properties()
  prop.put("user","root")
  prop.put("password","John123456")
  prop.put("driver","com.mysql.jdbc.Driver")
  employeeDF.write.mode("append")
   .jdbc("jdbc:mysql://192.168.254.124:3306/sparktest","sparktest.employee",prop)
   
  // 读取mysql中的数据
  val jdbcDF = spark.read.format("jdbc")
   .option("url","jdbc:mysql://192.168.254.124:3306/sparktest")
   .option("driver","com.mysql.jdbc.Driver")
   .option("dbtable", "employee")
   .option("user", "root")
   .option("password","John123456")
   .load()

  // 计算数据中age的最大值和总和
  jdbcDF.agg("age" -> "max", "age" -> "sum").show()
 }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

运行截图:
在这里插入图片描述在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/603629
推荐阅读
相关标签
  

闽ICP备14008679号