赞
踩
具体的安装过程在我以前的博客里面有,大家可以通过以下链接进入操作:
Hadoop的安装:https://blog.csdn.net/weixin_47580081/article/details/108647420
Scala及Spark的安装:https://blog.csdn.net/weixin_47580081/article/details/114250894
提示:如果IDEA未构建Spark项目,可以转接到以下的博客:
IDEA使用Maven构建Spark项目:https://blog.csdn.net/weixin_47580081/article/details/115435536
master
slave1
slave2
将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。
{“id”:1,“name”:“Ella”,“age”:36}
{“id”:2,“name”:“Bob”,“age”:29}
{“id”:3,“name”:“Jack”,“age”:29}
{“id”:4,“name”:“Jim”,“age”:28}
{“id”:4,“name”:“Jim”,“age”:28}
{“id”:5,“name”:“Damon”}
{“id”:5,“name”:“Damon”}
(1) 查询所有数据;
源代码:
val df = spark.read.json("file:///opt/data/employee.json")
df.show()
运行截图:
(2) 查询所有数据,并去除重复的数据;
源代码:
df.distinct().show()
运行截图:
(3) 查询所有数据,打印时去除id字段;
源代码:
df.drop("id").show()
运行截图:
(4) 筛选出age>30的记录;
源代码:
df.filter(df("age")>30).show()
运行截图:
(5) 将数据按age分组;
源代码:
df.groupBy("age").count().show()
运行截图:
(6) 将数据按name升序排列;
源代码:
df.sort(df("name")).show()
运行截图:
(7) 取出前3行数据;
源代码:
df.head(3)
运行截图:
(8) 查询所有记录的name列,并为其取别名为username;
源代码:
df.select(df("name").alias("username")).show()
运行截图:
(9) 查询年龄age的平均值;
源代码:
df.agg("age"->"avg").show()
运行截图:
(10) 查询年龄age的最小值。
源代码:
df.agg("age"->"min").show()
运行截图:
源文件内容如下(包含id,name,age):
1,Ella,36
2,Bob,29
3,Jack,29
源代码:
package com.John.Sparkstudy.sqlTest.Test02
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
/**
\* @author John
\* @Date 2021/4/15 17:44
*/
object project2 {
case class Employee(id:Long,name:String,age:Long)
def main(args: Array[String]) {
// 配置conf连接
val conf = new SparkConf()
conf.setMaster("local[3]")
.setAppName("RDDtoDF")
val sc = new SparkContext(conf)
// 创建 sparkSession
val spark = SparkSession.builder.getOrCreate
// 导入txt文件并转换为dataframe
import spark.implicits._
val employeeDF = sc.textFile("D:\\bigdata\\Spark分布式计算框架\\data\\employee.txt")
.map(_.split(","))
.map(attributes => Employee(attributes(0).trim.toInt, attributes(1),attributes(2).trim.toInt)).toDF()
// 创建临时视图,并用sparksql命令查询
employeeDF.createOrReplaceTempView("employee")
var employeeRDD = spark.sql("select id,name,age from employee")
// 设置输出格式
employeeRDD.map(t=>"id:"+t(0)+"name:"+t(1)+"age:"+t(2)).show()
}
}
运行截图:
(1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表6-2所示的两行数据。
id | name | gender | Age |
---|---|---|---|
1 | Alice | F | 22 |
2 | John | M | 25 |
源代码:
create database sparktest;
use sparktest;
create table employee(id int(4),name char(50), gender char(20), age int(10));
insert into employee values(1,'Alice','F',22);
insert into employee values(2,'John','M',25);
运行截图:
(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表6-3所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。
id | name | gender | age |
---|---|---|---|
3 | Mary | F | 26 |
4 | Tom | M | 23 |
源代码:
package com.John.Sparkstudy.sqlTest.Test02
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
/**
\* @author John
\* @Date 2021/4/15 17:25
*/
object project3 {
def main(args: Array[String]): Unit = {
// 创建 sparkSession
val spark:SparkSession = SparkSession.builder()
.appName("mysql_spark")
.master("local[3]")
.getOrCreate()
// 创建需要插入数据
val employeeRDD=spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))
val schema=StructType(List(StructField("id",IntegerType,true)
,StructField("name",StringType,true)
,StructField("gender",StringType,true)
, StructField("age",IntegerType,true)))
val rowRDD=employeeRDD.map(p=>Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))
val employeeDF=spark.createDataFrame(rowRDD,schema)
// 创建Properties对象,设置连接mysql的用户名和密码,并插入数据
val prop=new Properties()
prop.put("user","root")
prop.put("password","John123456")
prop.put("driver","com.mysql.jdbc.Driver")
employeeDF.write.mode("append")
.jdbc("jdbc:mysql://192.168.254.124:3306/sparktest","sparktest.employee",prop)
// 读取mysql中的数据
val jdbcDF = spark.read.format("jdbc")
.option("url","jdbc:mysql://192.168.254.124:3306/sparktest")
.option("driver","com.mysql.jdbc.Driver")
.option("dbtable", "employee")
.option("user", "root")
.option("password","John123456")
.load()
// 计算数据中age的最大值和总和
jdbcDF.agg("age" -> "max", "age" -> "sum").show()
}
}
运行截图:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。