赞
踩
Spark编程: Spark SQL基本操作 2020.11.01_df.agg("age"->"avg")-CSDN博客
- { "id":1 , "name":" Ella" , "age":36 }
- { "id":2, "name":"Bob","age":29 }
- { "id":3 , "name":"Jack","age":29 }
- { "id":4 , "name":"Jim","age":28 }
- { "id":4 , "name":"Jim","age":28 }
- { "id":5 , "name":"Damon" }
- { "id":5 , "name":"Damon" }
创建employee.json文件
- sudo vim employee.json
- cat employee.json
启动spark-shell
- cd /usr/local/spark/
- ./bin/spark-shell
- import spark.implicits._
- val df=spark.read.json("file:home/hadoop/下载/employee.json")
- df.show()
import spark.implicits._是Spark的一个工具,帮助 我们将RDD 转换为DataFrame。
spark.read.json是 Apache Spark 中的一个方法,用于从 JSON 文件中读取数据并将其加载到 DataFrame 中。
df.show()用于显示DataFrame中的内容。
df.distinct().show()
distinct()去重。
df.drop(df("id")).show()
df.drop()用于删除DataFrame中指定的列。
df.filter(df("age")>30).show()
df.filter()用于根据指定条件过滤DataFrame中的行。
df.groupBy(df("age")).count.show()
df.groupBy()用于根据指定的列对DataFrame进行分组。
df.count().show()用于显示分组后的DataFrame的内容。
df.sort(df("name").asc).show()
df.sort()用于对DataFrame中的行进行排序(默认升序)。
升序asc
降序desc
这里“Ella”比“Bob”小是因为“Ella”字符串实际上是“ Ella”,所以他的第一个字符不是‘E’而是‘ ’,对应的ASCII,‘E’是69,‘B’是66,‘ ’是32.
df.show(3)
df.show(n)用于显示DataFrame的前n行。(n超出后会打印原始的大小)
df.select(df("name").as("username")).show()
df.select()用于选择DataFrame中指定的列。
df.agg("age"->"avg").show()
df.agg()用于对DataFrame进行聚合操作。
avg平均。
df.agg("age"->"min").show()
min最小。
- 1,Ella,36
- 2,Bob,29
- 3,Jack,29
创建项目
- sudo mkdir -p /example/sparkapp6/src/main/scala
- cd /example/sparkapp6/src/main/scala
创建employee.txt
- sudo vim employee.txt
- cat employee.txt
创建Scala文件
sudo vim ./SimpleApp.scala
- import org.apache.spark.sql.types._
- import org.apache.spark.sql.Row
- import org.apache.spark.sql.SparkSession
-
- object SimpleApp {
- def main(args: Array[String]): Unit = {
- val spark = SparkSession.builder
- .appName("MyApp")
- .getOrCreate()
- val peopleRDD = spark.sparkContext.textFile("file:///example/sparkapp6/src/main/scala/employee.txt")
- val schemaString = "id name age"
- val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
- val schema = StructType(fields)
- val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim, attributes(2).trim))
- val peopleDF = spark.createDataFrame(rowRDD, schema)
- peopleDF.createOrReplaceTempView("people")
- val results = spark.sql("SELECT id,name,age FROM people")
- results.map(attributes => "id: " + attributes(0) + "," + "name:" + attributes(1) + "," + "age:" + attributes(2)).show()
- }
- }
这个代码没成功,继续往下面看。
创建.sbt文件
sudo vim build.sbt
这里需要的依赖发生了变化,不改会报错。
- name := "Simple Project"
- version := "1.0"
- scalaVersion := "2.13.13"
- libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"
- libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.1"
打包执行
- /usr/local/sbt/sbt package
- spark-submit --class "SimpleApp" ./target/scala-2.13/simple-project_2.13-1.0.jar
直接启动spark-shell(成功运行是看这里)
- cd /usr/local/spark/
- ./bin/spark-shell
- import org.apache.spark.sql.types._
- import org.apache.spark.sql.Row
-
- val peopleRDD = spark.sparkContext.textFile("file:///example/sparkapp6/src/main/scala/employee.txt")
- val schemaString = "id name age"
- val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
- val schema = StructType(fields)
- val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim, attributes(2).trim))
- val peopleDF = spark.createDataFrame(rowRDD, schema)
- peopleDF.createOrReplaceTempView("people")
- val results = spark.sql("SELECT id,name,age FROM people")
- results.map(attributes => "id: " + attributes(0) + "," + "name:" + attributes(1) + "," + "age:" + attributes(2)).show()
- scala> import org.apache.spark.sql.types._
- import org.apache.spark.sql.types._
-
- scala> import org.apache.spark.sql.Row
- import org.apache.spark.sql.Row
-
- scala> val peopleRDD = spark.sparkContext.textFile("file:///example/sparkapp6/src/main/scala/employee.txt")
- val peopleRDD: org.apache.spark.rdd.RDD[String] = file:///example/sparkapp6/src/main/scala/employee.txt MapPartitionsRDD[10] at textFile at <console>:1
-
- scala> val schemaString = "id name age"
- val schemaString: String = id name age
-
- scala> val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
- val fields: Array[org.apache.spark.sql.types.StructField] = Array(StructField(id,StringType,true), StructField(name,StringType,true), StructField(age,StringType,true))
-
- scala> val schema = StructType(fields)
- val schema: org.apache.spark.sql.types.StructType = StructType(StructField(id,StringType,true),StructField(name,StringType,true),StructField(age,StringType,true))
-
- scala> val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim, attributes(2).trim))
- val rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[12] at map at <console>:1
-
- scala> val peopleDF = spark.createDataFrame(rowRDD, schema)
- val peopleDF: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]
-
- scala> peopleDF.createOrReplaceTempView("people")
-
- scala> val results = spark.sql("SELECT id,name,age FROM people")
- val results: org.apache.spark.sql.DataFrame = [id: string, name: string ... 1 more field]
-
- scala> results.map(attributes => "id: " + attributes(0) + "," + "name:" + attributes(1) + "," + "age:" + attributes(2)).show()
- +--------------------+
- | value|
- +--------------------+
- |id: 1,name:Ella,a...|
- |id: 2,name:Bob,ag...|
- |id: 3,name:Jack,a...|
- +--------------------+
安装MySQL
MySQL :: Download MySQL Connector/J (Archived Versions)
- sudo tar -zxf ./mysql-connector-java-5.1.40.tar.gz -C /usr/local
- cd /usr/local/
- mv mysql-connector-java-5.1.40/ mysql
- su root
- service mysql start
- mysql -u root -p
建库
create database sparktest;
建表
- use sparktest;
- create table employee(id int(4),name char(20),gender char(4),Age int(4));
- mysql> use sparktest;
- Database changed
- mysql> create table employee(id int(4),name char(20),gender char(4),Age int(4));
- Query OK, 0 rows affected, 2 warnings (0.02 sec)
插入数据
- insert into employee values(1,'Alice','F',22);
- insert into employee values(2,'John','M',25);
- select * from employee;
- mysql> insert into employee values(1,'Alice','F',22);
- Query OK, 1 row affected (0.01 sec)
-
- mysql> insert into employee values(2,'John','M',25);
- Query OK, 1 row affected (0.01 sec)
-
- mysql> select * from employee;
-
- +------+-------+--------+------+
- | id | name | gender | Age |
- +------+-------+--------+------+
- | 1 | Alice | F | 22 |
- | 2 | John | M | 25 |
- +------+-------+--------+------+
- 2 rows in set (0.00 sec)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。