当前位置:   article > 正文

Spark SQL编程初级实践_sparksql初级编程实践

sparksql初级编程实践

参考链接

Spark编程: Spark SQL基本操作 2020.11.01_df.agg("age"->"avg")-CSDN博客

RDD编程初级实践-CSDN博客

Spark和Hadoop的安装-CSDN博客

Spark SQL 编程初级实践-CSDN博客

1. Spark SQL基本操作

  1. { "id":1 , "name":" Ella" , "age":36 }
  2. { "id":2, "name":"Bob","age":29 }
  3. { "id":3 , "name":"Jack","age":29 }
  4. { "id":4 , "name":"Jim","age":28 }
  5. { "id":4 , "name":"Jim","age":28 }
  6. { "id":5 , "name":"Damon" }
  7. { "id":5 , "name":"Damon" }

创建employee.json文件

  1. sudo vim employee.json
  2. cat employee.json

启动spark-shell

  1. cd /usr/local/spark/
  2. ./bin/spark-shell

1.1  查询所有数据

  1. import spark.implicits._
  2. val df=spark.read.json("file:home/hadoop/下载/employee.json")
  3. df.show()

import spark.implicits._是Spark的一个工具,帮助 我们将RDD 转换为DataFrame。

spark.read.json是 Apache Spark 中的一个方法,用于从 JSON 文件中读取数据并将其加载到 DataFrame 中。

df.show()用于显示DataFrame中的内容。

1.2  查询所有数据,并去除重复的数据

df.distinct().show()

 distinct()去重。

 

1.3  查询所有数据,打印时去除id字段

df.drop(df("id")).show()

 df.drop()用于删除DataFrame中指定的列。

 

1.4  筛选出age>30的记录

df.filter(df("age")>30).show()

 df.filter()用于根据指定条件过滤DataFrame中的行。

1.5  将数据按age分组

df.groupBy(df("age")).count.show()

df.groupBy()用于根据指定的列对DataFrame进行分组。

df.count().show()用于显示分组后的DataFrame的内容。

1.6  将数据按name升序排列

df.sort(df("name").asc).show()

df.sort()用于对DataFrame中的行进行排序(默认升序)。

升序asc

降序desc

这里“Ella”比“Bob”小是因为“Ella”字符串实际上是“ Ella”,所以他的第一个字符不是‘E’而是‘ ’,对应的ASCII,‘E’是69,‘B’是66,‘ ’是32.

 

1.7  取出前3行数据

df.show(3)

 df.show(n)用于显示DataFrame的前n行。(n超出后会打印原始的大小)

1.8  查询所有记录的name列,并为其取别名为username

df.select(df("name").as("username")).show()

 df.select()用于选择DataFrame中指定的列。

 

1.9  查询年龄age的平均值

df.agg("age"->"avg").show()

 df.agg()用于对DataFrame进行聚合操作。

avg平均。

1.10 查询年龄age的最小值

df.agg("age"->"min").show()

min最小。 

2.编程实现将RDD转换为DataFrame

  1. 1,Ella,36
  2. 2,Bob,29
  3. 3,Jack,29

创建项目

  1. sudo mkdir -p /example/sparkapp6/src/main/scala
  2. cd /example/sparkapp6/src/main/scala

  创建employee.txt

  1. sudo vim employee.txt
  2. cat employee.txt

创建Scala文件

sudo vim ./SimpleApp.scala
  1. import org.apache.spark.sql.types._
  2. import org.apache.spark.sql.Row
  3. import org.apache.spark.sql.SparkSession
  4. object SimpleApp {
  5. def main(args: Array[String]): Unit = {
  6. val spark = SparkSession.builder
  7. .appName("MyApp")
  8. .getOrCreate()
  9. val peopleRDD = spark.sparkContext.textFile("file:///example/sparkapp6/src/main/scala/employee.txt")
  10. val schemaString = "id name age"
  11. val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
  12. val schema = StructType(fields)
  13. val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim, attributes(2).trim))
  14. val peopleDF = spark.createDataFrame(rowRDD, schema)
  15. peopleDF.createOrReplaceTempView("people")
  16. val results = spark.sql("SELECT id,name,age FROM people")
  17. results.map(attributes => "id: " + attributes(0) + "," + "name:" + attributes(1) + "," + "age:" + attributes(2)).show()
  18. }
  19. }

这个代码没成功,继续往下面看。

 创建.sbt文件

sudo vim build.sbt

这里需要的依赖发生了变化,不改会报错。 

  1. name := "Simple Project"
  2. version := "1.0"
  3. scalaVersion := "2.13.13"
  4. libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"
  5. libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.1"

 打包执行

  1. /usr/local/sbt/sbt package
  2. spark-submit --class "SimpleApp" ./target/scala-2.13/simple-project_2.13-1.0.jar

 直接启动spark-shell(成功运行是看这里)

  1. cd /usr/local/spark/
  2. ./bin/spark-shell
  1. import org.apache.spark.sql.types._
  2. import org.apache.spark.sql.Row
  3. val peopleRDD = spark.sparkContext.textFile("file:///example/sparkapp6/src/main/scala/employee.txt")
  4. val schemaString = "id name age"
  5. val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
  6. val schema = StructType(fields)
  7. val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim, attributes(2).trim))
  8. val peopleDF = spark.createDataFrame(rowRDD, schema)
  9. peopleDF.createOrReplaceTempView("people")
  10. val results = spark.sql("SELECT id,name,age FROM people")
  11. results.map(attributes => "id: " + attributes(0) + "," + "name:" + attributes(1) + "," + "age:" + attributes(2)).show()

  1. scala> import org.apache.spark.sql.types._
  2. import org.apache.spark.sql.types._
  3. scala> import org.apache.spark.sql.Row
  4. import org.apache.spark.sql.Row
  5. scala> val peopleRDD = spark.sparkContext.textFile("file:///example/sparkapp6/src/main/scala/employee.txt")
  6. val peopleRDD: org.apache.spark.rdd.RDD[String] = file:///example/sparkapp6/src/main/scala/employee.txt MapPartitionsRDD[10] at textFile at <console>:1
  7. scala> val schemaString = "id name age"
  8. val schemaString: String = id name age
  9. scala> val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
  10. val fields: Array[org.apache.spark.sql.types.StructField] = Array(StructField(id,StringType,true), StructField(name,StringType,true), StructField(age,StringType,true))
  11. scala> val schema = StructType(fields)
  12. val schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,StringType,true),StructField(name,StringType,true),StructField(age,StringType,true))
  13. scala> val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim, attributes(2).trim))
  14. val rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[12] at map at <console>:1
  15. scala> val peopleDF = spark.createDataFrame(rowRDD, schema)
  16. val peopleDF: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]
  17. scala> peopleDF.createOrReplaceTempView("people")
  18. scala> val results = spark.sql("SELECT id,name,age FROM people")
  19. val results: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]
  20. scala> results.map(attributes => "id: " + attributes(0) + "," + "name:" + attributes(1) + "," + "age:" + attributes(2)).show()
  21. +--------------------+
  22. | value|
  23. +--------------------+
  24. |id: 1,name:Ella,a...|
  25. |id: 2,name:Bob,ag...|
  26. |id: 3,name:Jack,a...|
  27. +--------------------+

3.编程实现利用DataFrame读写MySQL的数据

安装MySQL

MySQL :: Download MySQL Connector/J (Archived Versions)

  1. sudo tar -zxf ./mysql-connector-java-5.1.40.tar.gz -C /usr/local
  2. cd /usr/local/
  3. mv mysql-connector-java-5.1.40/ mysql

3.1 在MySQL数据库中新建数据库sparktest,再创建表employee

  1. su root
  2. service mysql start
  3. mysql -u root -p

 建库

create database sparktest;

 

建表

  1. use sparktest;
  2. create table employee(id int(4),name char(20),gender char(4),Age int(4));
  1. mysql> use sparktest;
  2. Database changed
  3. mysql> create table employee(id int(4),name char(20),gender char(4),Age int(4));
  4. Query OK, 0 rows affected, 2 warnings (0.02 sec)

 

插入数据

  1. insert into employee values(1,'Alice','F',22);
  2. insert into employee values(2,'John','M',25);
  3. select * from employee;
  1. mysql> insert into employee values(1,'Alice','F',22);
  2. Query OK, 1 row affected (0.01 sec)
  3. mysql> insert into employee values(2,'John','M',25);
  4. Query OK, 1 row affected (0.01 sec)
  5. mysql> select * from employee;
  6. +------+-------+--------+------+
  7. | id | name | gender | Age |
  8. +------+-------+--------+------+
  9. | 1 | Alice | F | 22 |
  10. | 2 | John | M | 25 |
  11. +------+-------+--------+------+
  12. 2 rows in set (0.00 sec)

 

3.2 配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/677548
推荐阅读
相关标签
  

闽ICP备14008679号