当前位置:   article > 正文

spark结课tip3

spark结课tip3

目录

spark SQL语言:

1.show():

基础代码:

效果展示:

2.printSchema()

基础代码:

效果展示:

3.select()

基础代码:

效果展示:

4.filter()

基础代码:

效果展示:

5.groupBy()

基础代码:

效果展示:

6.sort()

基础代码:

效果展示:


spark SQL语言:

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。使用Spark SQL有两种方式,包括SQL语句以及Dataset API。Spark SQL的一个主要的功能就是执行SQL查询语句。Spark SQL也可以用来从Hive中查询数据。当我们使用某种编程语言开发的Spark作业来执行SQL时,返回的结果是Dataframe/Dataset类型的。当然,我们也可以通过Spark SQL的shell命令行工具,或者是JDBC/ODBC接口来访问。

Dataframe的常用方法,具体如下如示

1.show():

查看DataFrame中的具体内容信息,这将会打印DataFrame df的前20行数据

基础代码:
 
  1. import org.apache.spark.{SparkConf, SparkContext}
  2. import org.apache.spark.sql.SparkSession
  3. object Y1{
  4. def main(args: Array[String]): Unit = {
  5. val conf = new SparkConf().setAppName("ParallelizeExample").setMaster("local")
  6. val sc = new SparkContext(conf)
  7. val spark = SparkSession.builder
  8. .appName("Spark SQL show example")
  9. .master("local[*]") // 或者你的Spark集群 URL
  10. .getOrCreate()
  11. import spark.implicits._
  12. // 示例数据
  13. val people = Seq(
  14. (1, "Alice", 34),
  15. (2, "Bob", 42),
  16. (3, "Charlie", 51)
  17. )
  18. // 将示例数据转换为DataFrame
  19. val df = people.toDF("id", "name", "age")
  20. df.show()
  21. sc.stop()
  22. }
  23. }
效果展示:

2.printSchema()

查看DataFrame的Schema信息

基础代码:
 
  1. import org.apache.spark.{SparkConf, SparkContext}
  2. import org.apache.spark.sql.SparkSession
  3. object Y1{
  4. def main(args: Array[String]): Unit = {
  5. val conf = new SparkConf().setAppName("ParallelizeExample").setMaster("local")
  6. val sc = new SparkContext(conf)
  7. val spark = SparkSession.builder
  8. .appName("Spark SQL show example")
  9. .master("local[*]") // 或者你的Spark集群 URL
  10. .getOrCreate()
  11. import spark.implicits._
  12. // 示例数据
  13. val people = Seq(
  14. (1, "Alice", 34),
  15. (2, "Bob", 42),
  16. (3, "Charlie", 51)
  17. )
  18. // 将示例数据转换为DataFrame
  19. val df = people.toDF("id", "name", "age")
  20. df.printSchema()
  21. sc.stop()
  22. }
  23. }
效果展示:

3.select()

查看DataFrame中选取部分列的数据及进行重命名

基础代码:
  1. import org.apache.spark.{SparkConf, SparkContext}
  2. import org.apache.spark.sql.SparkSession
  3. object Y1{
  4. def main(args: Array[String]): Unit = {
  5. val conf = new SparkConf().setAppName("ParallelizeExample").setMaster("local")
  6. val sc = new SparkContext(conf)
  7. val spark = SparkSession.builder
  8. .appName("Spark SQL show example")
  9. .master("local[*]") // 或者你的Spark集群 URL
  10. .getOrCreate()
  11. import spark.implicits._
  12. // 示例数据
  13. val peopleDF = Seq(
  14. ("Alice", 30, "USA"),
  15. ("Bob", 20, "Canada"),
  16. ("Charlie", 40, "UK")
  17. ).toDF("name", "age", "country")
  18. val selectedDF = peopleDF.select("name", "age")
  19. // 显示结果
  20. selectedDF.show()
  21. sc.stop()
  22. }
  23. }

 
效果展示:

4.filter()

实现条件查询,过滤出想要的结果

基础代码:
  1. import org.apache.spark.sql.SparkSession
  2. object Y1 {
  3. case class Person(name: String, age: Int)
  4. def main(args: Array[String]): Unit = {
  5. val spark = SparkSession.builder()
  6. .appName("Spark SQL Demo")
  7. .master("local[*]")
  8. .getOrCreate()
  9. import spark.implicits._
  10. // 创建Person案例类的DataFrame对象
  11. val personsDF = Seq(
  12. Person("Alice", 25),
  13. Person("Bob", 30),
  14. Person("Charlie", 35)
  15. ).toDF()
  16. // 使用filter()方法过滤出年龄大于等于30的人
  17. val filteredDF = personsDF.filter($"age" >= 30)
  18. // 显示结果
  19. filteredDF.show()
  20. // 关闭SparkSession
  21. spark.stop()
  22. }
  23. }

 
效果展示:

5.groupBy()

对记录进行分组

基础代码:
 
  1. import org.apache.spark.sql.SparkSession
  2. object SparkSQLDemo {
  3. case class Person(name: String, age: Int, country: String)
  4. def main(args: Array[String]): Unit = {
  5. val spark = SparkSession.builder()
  6. .appName("Spark SQL Demo")
  7. .master("local[*]")
  8. .getOrCreate()
  9. import spark.implicits._
  10. // 创建Person案例类的DataFrame对象
  11. val personsDF = Seq(
  12. Person("Alice", 25, "USA"),
  13. Person("Bob", 30, "Canada"),
  14. Person("Charlie", 35, "USA"),
  15. Person("Dave", 40, "Canada")
  16. ).toDF()
  17. // 使用groupBy()方法按照国家分组,并计算每个国家的人数
  18. val groupedDF = personsDF.groupBy($"country").count()
  19. // 显示结果
  20. groupedDF.show()
  21. // 关闭SparkSession
  22. spark.stop()
  23. }
  24. }
效果展示:

6.sort()

对特定字段进行排序操作

基础代码:
 
  1. import org.apache.spark.sql.SparkSession
  2. import org.apache.spark.sql.functions._
  3. object Y1 {
  4. case class Person(name: String, age: Int)
  5. def main(args: Array[String]): Unit = {
  6. val spark = SparkSession.builder()
  7. .appName("Spark SQL Demo")
  8. .master("local[*]")
  9. .getOrCreate()
  10. import spark.implicits._
  11. // 创建Person案例类的DataFrame对象
  12. val personsDF = Seq(
  13. Person("Alice", 25),
  14. Person("Bob", 30),
  15. Person("Charlie", 35)
  16. ).toDF()
  17. // 使用sort()方法按照年龄升序排序
  18. val sortedDF = personsDF.sort(asc("age"))
  19. // 显示结果
  20. sortedDF.show()
  21. // 关闭SparkSession
  22. spark.stop()
  23. }
  24. }
效果展示:

新人作者,如果有什么需要改进的地方,请多多指教! 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/592690
推荐阅读
相关标签
  

闽ICP备14008679号