当前位置:   article > 正文

Spark SQL编程_编写一个 sql 查询,计算表中员工的平均年龄

编写一个 sql 查询,计算表中员工的平均年龄

1、Spark SQL基本操作

(一)创建DataFrame

(二)查询所有数据

 

(三)查询所有数据并去除重复的数据

 

(四)查询所有数据打印时去除id字段

 

(五)筛选出age>30的记录

 

(六)将数据按age分组

 

(七)将数据按name升序排序 

(八)取出前3行数据

 

(九)查询所有记录的name列,并为其取别名为username

 

(十)查询年龄age的平均值

 

(十一)查询年龄age的最小值 

2、编程实现将RDD转换为DataFrame

(一) 代码编写

  1. import org.apache.spark.sql.types._
  2. import org.apache.spark.sql.Row
  3. val peopleRDD = spark.sparkContext.textFile("file:///root/wyqWork/Demo6/employee.txt")
  4. val schemaString = "id name age"
  5. val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
  6. val schema = StructType(fields)
  7. val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim, attributes(2).trim))
  8. val peopleDF = spark.createDataFrame(rowRDD, schema)
  9. peopleDF.createOrReplaceTempView("people")
  10. val results = spark.sql("SELECT id,name,age FROM people")
  11. results.map(attributes => "id: " + attributes(0)+","+"name:"+attributes(1)+","+"age:"+attributes(2)).show()

 (二)结果输出

3、利用DataFrame读写MySQL的数据

(一)新建数据库sparktest,再创建表employee,并输入数据

 

 

(二)查询当前表中所有数据

 

4、配置Sp a r k通过JDBC连接数据库MySQL,编程实现利用Da t aFr a m e插入如表6 -2所示的两行数据到MySQL中,最后打印出a g e的最大值和a g e的总和。

(一)代码编写

  1. import java.util.Properties
  2. import org.apache.spark.sql.types._
  3. import org.apache.spark.sql.Row
  4. import org.apache.spark.sql.SparkSession
  5. val spark=SparkSession.builder().appName("TestMySQL").master("local").getOrCreate()
  6. import spark.implicits._
  7. val employeeRDD=spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))
  8. val schema=StructType(List(StructField("id",IntegerType,
  9. true),StructField("name",StringType,true),StructField("gender",StringType,true),
  10. StructField("age",IntegerType,true)))
  11. val rowRDD=employeeRDD.map(p=>Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))
  12. val employeeDF=spark.createDataFrame(rowRDD,schema)
  13. val prop=new Properties()
  14. prop.put("user","root")
  15. prop.put("password","10086CYc#")
  16. prop.put("driver","com.mysql.cj.jdbc.Driver")
  17. employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest","sparktest.employee",prop)
  18. val jdbcDF = spark.read.format("jdbc").
  19. option("url","jdbc:mysql://localhost:3306/sparktest").
  20. option("driver","com.mysql.cj.jdbc.Driver").
  21. option("dbtable", "employee").
  22. option("user", "root").
  23. option("password", "10086CYc#").
  24. load()
  25. jdbcDF.agg("age" -> "max", "age" -> "sum").show()

 

 (二)结果输出

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/663310
推荐阅读
相关标签
  

闽ICP备14008679号