赞
踩
数据:employee.json
格式:
{ "id":1 ,"name":" Ella","age":36 }
{ "id":2,"name":"Bob","age":29 }
{ "id":3 ,"name":"Jack","age":29 }
{ "id":4 ,"name":"Jim","age":28 }
{ "id":5 ,"name":"Damon" }
{ "id":5 ,"name":"Damon" }
首先为employee.json创建DataFrame
import org.apache.spark.sql.SparkSession
val session = SparkSession.builder().appName("sql").master("local").getOrCreate()
val df = session.read.format("json").load("hdfs://namenode:9000/spark/spark-sql/employee.json")
然后进行以下操作
查询DataFrame的所有数据
//定义一个临时表people
df.createOrReplaceTempView("people")
spark.sql("select * from people").show
//或者
df.show()
查询所有数据,并去除重复的数据
spark.sql("select distinct(*) from people").show
//或者
df.distinct()
查询所有数据,打印时去除id字段
spark.sql("select age,name from people").show
df.drop("id").show()
筛选age>20的记录
spark.sql("select * from people where age>20").show
//或者
df.filter(df("age")>20).show()
说明:从这里往后,只给出代码
将数据按name分组
spark.sql("select name,count(*) from people group by name").show
//或者
df.groupBy("name").count().show()
将数据按name升序排列
spark.sql("select * from people order by name").show
//或者
df.sort(df("name")).show()
df.sort(df("name").asc).show()
取出前3行数据
spark.sql("select * from people where 1=1 limit 3").show
//或者
df.take(3).foreach(println)
df.head(3).foreach(println)
这里有点不同,用后面的方法得到的是数据,不是返回一个表结构
查询所有记录的name列,并为其取别名为username
spark.sql("select name usename from people").show
//或者
df.select(df("name").as("username")).show()
查询年龄age的平均值
spark.sql("select mean(age) from people").show
//或者
df.agg("age"->"avg").collect().foreach(println)
查询年龄age的最小值
spark.sql("select min(age) from people").show
//或者
df.agg("age"->"min").collect().foreach(println)
数据:employee.json
格式:
1,Ella,36
2,Bob,29
3,Jack,29
下面给出两种方式
//利用反射机制来实现包含特定类型对象的RDD的模式,适用于已知数据结构的RDD转换 import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession //定义一个case class,因为case class能被Spark隐式地转换为DataFrame //Spark SQL的Scala接口支持自动将包含RDD的案例类转换为DataFrame //case类定义表的模式,使用反射读取case类的参数名称,并成为列的名称 case class Employee(id:Long,name: String, age: Long) object RDDtoDF { def main(args: Array[String]) { val conf=new SparkConf().setAppName("RDDtoDF").setMaster("local") //SparkSession是Spark SQL 的入口,这里创建的spark是一个SparkSession对象 val spark= SparkSession.builder.appName("My Spark Application").config(conf).master("local").getOrCreate //这里的spark是我们上面定义的字面量,必须调入这个才能使用下面的.toDF方法 import spark.implicits._ //从文本文件创建Person对象的RDD,将其转换为DataFrame val employeeDF = spark.sparkContext.textFile("hdfs://namenode:9000/spark/spark-sql/employee.txt") .map(_.split(",")) .map(t => Employee(t(0).trim.toInt,t(1), t(2).trim.toInt)).toDF() //必须注册为临时表才能供下面的查询使用 employeeDF.createOrReplaceTempView("employee") val employeeRDD = spark.sql("select id,name,age from employee") employeeRDD.map(t => "id:"+t(0)+","+"name:"+t(1)+","+"age:"+t(2)).show() } }
运行结果
提交jar包
spark-submit --class RDDtoDF --master RDDtoDF.jar
运行结果
import org.apache.spark.SparkConf import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types._ object RDDtoDF_Two { def main(args: Array[String])= { val conf=new SparkConf().setAppName("RDDToDataFrame2").setMaster("local") val spark=SparkSession.builder().appName("RDDToDataFrame2").config(conf).getOrCreate() import spark.implicits._ //创建RDD val employeeRDD=spark.sparkContext.textFile("hdfs://namenode:9000/spark/spark-sql/employee.txt") val fields=Array(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("age",IntegerType,true)) val schema=StructType(fields) //将RDD的记录转换为行 val rowRDD=employeeRDD.map(_.split(",")).map(t=>Row(t(0).trim.toInt,t(1), t(2).trim.toInt)) //将模式应用于RDD val employeeDF=spark.createDataFrame(rowRDD,schema) //使用DataFrame创建临时视图 employeeDF.createOrReplaceTempView("employee") //SQL可以在使用DataFrames创建的临时视图上运行 val results = spark.sql("select id,name,age from employee") results.map(t => "id:"+t(0)+","+"name:"+t(1)+","+"age:"+t(2)).show() } }
运行结果
提交jar包
spark-submit --class RDDtoDF_Two --master local RDDtoDF_Two.jar
运行结果
数据:在MySQL数据库中新建数据库sparktest,再建表employee,包含下列两行数据
id | name | gender | age |
---|---|---|---|
1 | Alice | F | 22 |
2 | John | M | 25 |
任务要求:配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入下列数据到MySQL,最后打印出age的最大值和age的总和
id | name | gender | age |
---|---|---|---|
3 | Marty | F | 26 |
4 | Tom | M | 23 |
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。