赞
踩
Spark Core 1 RDD : 弹性式分布式数据集,存储数据,Spark的基本数据存储单元 2 Master和Worker是什么?他们是Spark集群的standalone模式中的主从 3 DAG和Lineage, 有向无环图?血缘? 4 编程入口:SparkContext 5 转换算子和action算子 6 工作流程图 7 共享变量:广播变量和累加器 8 自定义分区器 Spark SQL 1 DataFrame和DataSet 2 RDD DF DS 3 SparkSession 4 SQL -》 spark.sql(""" """) 5 SparkSQL和Hive的整合
Spark SQL顾名思义他就是spark体系中构建spark core基础之上的基于SQL计算的模块。
它的前身叫做Shark,最开始为了优化spark core底层代码,同时它也是为了整合hive使用。但是随着发展Shark的速度已经比Hive更快了。说白了就是hive的发展已经制约了shark的发展了。所以在2015年的时候,shark项目负责人就将shark终止了,然后独立了一个新项目出来:Spark SQL。又随着SPark SQL的发展,慢慢形成了两条相互独立的业务:SparkSQL和Hive-On-Spark。
Spark SQL中提供了Spark的第二代和第三代编程模型,分别是:DataFrame和Dataset。除此之外,还支持直接使用SQL来完成Spark编程。
弹性式分布式数据及。具体见Spark Core笔记
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-d48GFdrs-1653981175337)(001.png)]
DataFrame你可以就把它理解为RDD,RDD是一个数据集,DataFrame相较于RDD基础之上多了一个schema(表头)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UBCZLIBD-1653981175338)(002.png)]
spark1.6版本开始才有的模型。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-a7RCAxCo-1653981175339)(003.png)]
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark-version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>${spark-version}</version>
</dependency>
val spark: SparkSession = SparkSession.builder()
.appName("demo1")
.master("local[*]")
// .enableHiveSupport() // 开启hive支持
.getOrCreate()
package com.qf.bigdata.spark.core.day3 import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} /** * 自定义Spark工具类 */ object SparkUtils { /** * 获取到本地的SparkContext对象 */ def getLocalSparkContext():SparkContext = getLocalSparkContext("default_app") /** * 获取到本地的SparkContext对象 */ def getLocalSparkContext(appName:String):SparkContext = getSparkContext("local[*]", appName) /** * 获取到SparkContext对象 */ def getSparkContext(master:String, appName:String):SparkContext = new SparkContext(master, appName, new SparkConf()) def getLocalSparkSession():SparkSession = getLocalSparkSession("default_app", false) def getLocalSparkSession(appName:String, isSupportedHive:Boolean = false):SparkSession = getSparkSession("local[*]", appName, isSupportedHive) def getSparkSession(master:String, appName:String, isSupportedHive:Boolean = false):SparkSession = { if (isSupportedHive) SparkSession.builder().appName(appName).master(master).enableHiveSupport().getOrCreate() else SparkSession.builder().appName(appName).master(master).getOrCreate() } /** * 释放资源 */ def close(sc:SparkContext):Unit = if (sc != null && !sc.isStopped) sc.stop() def close(spark:SparkSession):Unit = if (spark != null && !spark.sparkContext.isStopped) spark.stop() }
package com.qf.bigdata.spark.sql.day1 import com.qf.bigdata.spark.core.day3.SparkUtils import com.qf.bigdata.spark.core.day5.LoggerTrait import org.apache.spark.sql.{DataFrame, SparkSession} object Demo1 extends LoggerTrait{ def main(args: Array[String]): Unit = { //1. SparkSession val spark: SparkSession = SparkUtils.getLocalSparkSession() //2. 加载数据 : DataFrame/DataSet API val df: DataFrame = spark.read.json("C:\\ftp\\person.json") //2.1 打印表结构 df.printSchema() //2.2 查询表内容 df.show() //2.3 按字段查询:name,gender df.select("name", "gender").show() //2.4 age + 1 import spark.implicits._ // 导入隐式转换函数 df.select($"name", $"age" + 1).show() //2.5 别名 df.select($"name", ($"age" + 1).as("age")).show() //2.6 聚合 : select count(age) as count from xxx group by age df.select($"age" ).groupBy($"age").count().as("count").show() //2.7 条件查询 df.select($"name", $"gender", $"age").where($"age" > 30).show() //3. SQL //3.1 建立虚拟表 /** * * createOrReplaceTempView * createOrReplaceGlobalTempView * createTempView * createGlobalTempView * * 1. 从范围讲 * 带global的范围更大,表示在整个sparkapplication中可用,不带global只能在当前的sparksession中管用 * * 2. 从创建角度说 * 带Replace的表示如果这个名称的表已经存在的化就替换这个表,没有的话就创建这个表 * 不带Replace的表示这个表就创建,有这个表就报错!!! */ df.createOrReplaceTempView("user") // 临时表 //3.2 使用sql查询 spark.sql( s""" |select name,gender,age from user where age > 30 |""".stripMargin).show() // 释放资源 SparkUtils.close(spark) } }
反射其实就是先获取方法区中Class对象,通过Class对象反过来创建其他对象的实例。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xcPvDMdy-1653981175339)(004.png)]
package com.qf.bigdata.reflect; public class Person { private int age; public String name; public Person() {} public Person(String name) { this.name = name; } public void showInfo() { System.out.println(name + "_" + age); } private void speak() { System.out.println("今天不上晚自习是不可能的"); } }
package com.qf.bigdata.reflect; public class Demo1 { public static void main(String[] args) { //1. 第一种:类名.class Class<Person> clazz = Person.class; //2. 第二种:对象.getClass() Class<? extends Person> clazz2 = new Person().getClass(); //3. 第三种:Class静态方法 try { Class<?> clazz3 = Class.forName("com.qf.bigdata.reflect.Person");// 包名.类名 } catch (ClassNotFoundException e) { e.printStackTrace(); } } }
package com.qf.bigdata.reflect; public class Demo2 { public static void main(String[] args) { //1. 获取到Class对象 Class<Person> clazz = null; try { clazz = (Class<Person>) Class.forName("com.qf.bigdata.reflect.Person");// 包名.类名 } catch (ClassNotFoundException e) { e.printStackTrace(); } try { Person person = clazz.newInstance(); System.out.println(person); } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } } }
package com.qf.bigdata.reflect; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; public class Demo3 { public static void main(String[] args) { //1. 获取到Class对象 Class<Person> clazz = null; try { clazz = (Class<Person>) Class.forName("com.qf.bigdata.reflect.Person");// 包名.类名 } catch (ClassNotFoundException e) { e.printStackTrace(); } //2. 获取到有参构造方法这个对象, String.class = Class<String> Constructor<Person> constructor = null; try { constructor = clazz.getConstructor(String.class); } catch (NoSuchMethodException e) { e.printStackTrace(); } //3. 通过构造函数反过来创建对象的实例 Person lixi = null; try { lixi = constructor.newInstance("李茜"); } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } System.out.println(lixi.name); } }
package com.qf.bigdata.reflect; import java.lang.reflect.Field; public class Demo4 { public static void main(String[] args) throws ClassNotFoundException, NoSuchFieldException, IllegalAccessException { Person p = new Person(); System.out.println(p.name); Class<Person> clazz = (Class<Person>) Class.forName("com.qf.bigdata.reflect.Person"); //1. 获取到name的属性 Field field = clazz.getField("name"); //2. 为name属性赋值 field.set(p, "李茜"); //3. 输出 System.out.println(p.name); } }
package com.qf.bigdata.reflect;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
public class Demo5 {
public static void main(String[] args) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Person p = new Person();
Class<Person> clazz = (Class<Person>) Class.forName("com.qf.bigdata.reflect.Person");
//1. 获取到方法对象
Method method = clazz.getMethod("showInfo");
//2. 调用这个对象的方法
method.invoke(p);
}
}
package com.qf.bigdata.reflect; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; public class Demo6 { public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException { Person p = new Person(); System.out.println(p.getAge()); Class<Person> clazz = (Class<Person>) Class.forName("com.qf.bigdata.reflect.Person"); // Field field = clazz.getField("age"); // 此方法只能找到class中的public的属性 Field field = clazz.getDeclaredField("age"); field.setAccessible(true); // 设置属性可访问 // Method method = clazz.getMethod("speak"); // 此方法只能找到class中的public的方法 Method method = clazz.getDeclaredMethod("speak"); method.setAccessible(true); // 设置speak方法可访问 method.invoke(p); //2. 为name属性赋值 field.set(p, 11); //3. 输出 System.out.println(p.getAge()); } }
package com.qf.bigdata.spark.sql.day2 import java.util import com.qf.bigdata.spark.core.day3.SparkUtils import com.qf.bigdata.spark.core.day5.LoggerTrait import org.apache.spark.sql.{DataFrame, SparkSession} import scala.beans.BeanProperty import scala.collection.{JavaConversions, JavaConverters} object Demo1 extends LoggerTrait{ def main(args: Array[String]): Unit = { val spark: SparkSession = SparkUtils.getLocalSparkSession() import spark.implicits._ //1. 创建scala的list存储数据 val list = List( new Student(1, "王东阳", "男", 18), new Student(2, "王西阳", "女", 19), new Student(3, "王北阳", "男", 20), new Student(4, "王南阳", "女", 21) ) //2. 将scala的list转换为java的list // val stuList: util.List[Student] = JavaConversions.seqAsJavaList(list) // scala 2.11.8 val stuList: util.List[Student] = JavaConverters.seqAsJavaList(list) // scala 2.12.8 //3. 创建dataframe,要求list必须是java.util.List val df: DataFrame = spark.createDataFrame(stuList, classOf[Student]) // Student.class //4. 打印 df.show() SparkUtils.close(spark) } } class Student { @BeanProperty var id:Int = _ @BeanProperty var name:String = _ @BeanProperty var gender:String = _ @BeanProperty var age:Int = _ def this(id:Int, name:String, gender:String, age:Int) = { this() this.id = id this.name = name this.gender = gender this.age = age } }
package com.qf.bigdata.spark.sql.day2 import com.qf.bigdata.spark.core.day3.SparkUtils import com.qf.bigdata.spark.core.day5.LoggerTrait import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} object Demo2 extends LoggerTrait{ def main(args: Array[String]): Unit = { val spark: SparkSession = SparkUtils.getLocalSparkSession() import spark.implicits._ //1. 造数据 val rowRDD: RDD[Row] = spark.sparkContext.parallelize(List( Row(1, "令狐冲", "华山派"), Row(2, "任盈盈", "日月神教"), Row(3, "岳灵珊", "华山派"), Row(4, "田伯光", "散人") )) //2. 造表头 val schema: StructType = StructType(List( StructField("id", DataTypes.IntegerType), StructField("name", DataTypes.StringType), StructField("class", DataTypes.StringType) )) //3. RDD + Schema == DataFrame val df: DataFrame = spark.createDataFrame(rowRDD, schema) df.show() SparkUtils.close(spark) } }
DataFrame的升级版
package com.qf.bigdata.spark.sql.day2 import com.qf.bigdata.spark.core.day3.SparkUtils import com.qf.bigdata.spark.core.day5.LoggerTrait import org.apache.spark.sql.{Dataset, SparkSession} object Demo3 extends LoggerTrait{ def main(args: Array[String]): Unit = { val spark: SparkSession = SparkUtils.getLocalSparkSession() import spark.implicits._ val ds: Dataset[Stu] = spark.createDataset(List( Stu(1, "王东阳", "男", 18), Stu(2, "王西阳", "女", 19), Stu(3, "王北阳", "男", 20), Stu(4, "王南阳", "女", 21) )) ds.show() SparkUtils.close(spark) } } case class Stu(id:Int, name:String, gender:String, age:Int)
package com.qf.bigdata.spark.sql.day2 import java.util import com.qf.bigdata.spark.core.day3.SparkUtils import com.qf.bigdata.spark.core.day5.LoggerTrait import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import scala.collection.JavaConverters object Demo4 extends LoggerTrait{ def main(args: Array[String]): Unit = { val spark: SparkSession = SparkUtils.getLocalSparkSession() import spark.implicits._ //1. 创建scala的list存储数据 val list = List( new Student(1, "王东阳", "男", 18), new Student(2, "王西阳", "女", 19), new Student(3, "王北阳", "男", 20), new Student(4, "王南阳", "女", 21) ) //2. rdd val stuRDD: RDD[Student] = spark.sparkContext.parallelize(list) //3. 创建dataframe val df: DataFrame = spark.createDataFrame(stuRDD, classOf[Student]) // Student.class //4. 打印 df.show() SparkUtils.close(spark) } } class Student extends Serializable { @BeanProperty var id:Int = _ @BeanProperty var name:String = _ @BeanProperty var gender:String = _ @BeanProperty var age:Int = _ def this(id:Int, name:String, gender:String, age:Int) = { this() this.id = id this.name = name this.gender = gender this.age = age } }
package com.qf.bigdata.spark.sql.day2 import com.qf.bigdata.spark.core.day3.SparkUtils import com.qf.bigdata.spark.core.day5.LoggerTrait import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} object Demo5 extends LoggerTrait{ def main(args: Array[String]): Unit = { val spark: SparkSession = SparkUtils.getLocalSparkSession() import spark.implicits._ val stuRDD: RDD[String] = spark.sparkContext.parallelize(List( "1,苍老师,女", "2,小泽老师,女" )) val df: DataFrame = stuRDD.map(_.split(",")).map(arr => (arr(0), arr(1), arr(2))).toDF("id", "name", "gender") df.show() SparkUtils.close(spark) } }
package com.qf.bigdata.spark.sql.day2 import com.qf.bigdata.spark.core.day3.SparkUtils import com.qf.bigdata.spark.core.day5.LoggerTrait import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} object Demo6 extends LoggerTrait{ def main(args: Array[String]): Unit = { val spark: SparkSession = SparkUtils.getLocalSparkSession() import spark.implicits._ val stuRDD: RDD[Stu] = spark.sparkContext.parallelize(List( Stu(1, "王东阳", "男", 18), Stu(2, "王西阳", "女", 19), Stu(3, "王北阳", "男", 20), Stu(4, "王南阳", "女", 21) )) val ds: Dataset[Stu] = spark.createDataset(stuRDD) ds.show() SparkUtils.close(spark) } }
package com.qf.bigdata.spark.sql.day2 import com.qf.bigdata.spark.core.day3.SparkUtils import com.qf.bigdata.spark.core.day5.LoggerTrait import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, SparkSession} object Demo7 extends LoggerTrait{ def main(args: Array[String]): Unit = { val spark: SparkSession = SparkUtils.getLocalSparkSession() import spark.implicits._ val stuRDD: RDD[Stu] = spark.sparkContext.parallelize(List( Stu(1, "王东阳", "男", 18), Stu(2, "王西阳", "女", 19), Stu(3, "王北阳", "男", 20), Stu(4, "王南阳", "女", 21) )) val ds: Dataset[Stu] = stuRDD.toDS() ds.show() SparkUtils.close(spark) } }
package com.qf.bigdata.spark.sql.day2 import com.qf.bigdata.spark.core.day3.SparkUtils import com.qf.bigdata.spark.core.day5.LoggerTrait import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} object Demo8_DF2DS extends LoggerTrait{ def main(args: Array[String]): Unit = { val spark: SparkSession = SparkUtils.getLocalSparkSession() import spark.implicits._ val stuRDD: RDD[String] = spark.sparkContext.parallelize(List( "1,苍老师,女", "2,小泽老师,女" )) val df: DataFrame = stuRDD.map(_.split(",")).map(arr => (arr(0), arr(1), arr(2))).toDF("id", "name", "gender") // dataframe -> dataset val ds: Dataset[User] = df.as[User] ds.show() SparkUtils.close(spark) } } case class User(id:String, name:String, gender:String)
package com.qf.bigdata.spark.sql.day2 import com.qf.bigdata.spark.core.day3.SparkUtils import com.qf.bigdata.spark.core.day5.LoggerTrait import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object Demo8_DF2DS extends LoggerTrait{ def main(args: Array[String]): Unit = { val spark: SparkSession = SparkUtils.getLocalSparkSession() import spark.implicits._ val stuRDD: RDD[String] = spark.sparkContext.parallelize(List( "1,苍老师,女", "2,小泽老师,女" )) val df: DataFrame = stuRDD.map(_.split(",")).map(arr => (arr(0), arr(1), arr(2))).toDF("id", "name", "gender") // dataframe val rdd: RDD[Row] = df.rdd rdd.foreach(row => { val id = row.getString(0) val name = row.getString(1) val gender = row.getString(2) println(s"${id} -> ${name} -> ${gender}") }) SparkUtils.close(spark) } }
package com.qf.bigdata.spark.sql.day2 import com.qf.bigdata.spark.core.day3.SparkUtils import com.qf.bigdata.spark.core.day5.LoggerTrait import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object Demo9_DS2DF extends LoggerTrait{ def main(args: Array[String]): Unit = { val spark: SparkSession = SparkUtils.getLocalSparkSession() import spark.implicits._ val stuRDD: RDD[Stu] = spark.sparkContext.parallelize(List( Stu(1, "王东阳", "男", 18), Stu(2, "王西阳", "女", 19), Stu(3, "王北阳", "男", 20), Stu(4, "王南阳", "女", 21) )) val ds: Dataset[Stu] = stuRDD.toDS() // ds --> rdd val rdd: RDD[Stu] = ds.rdd SparkUtils.close(spark) } } case class User(id:String, name:String, gender:String)
package com.qf.bigdata.spark.sql.day2 import com.qf.bigdata.spark.core.day3.SparkUtils import com.qf.bigdata.spark.core.day5.LoggerTrait import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object Demo9_DS2DF extends LoggerTrait{ def main(args: Array[String]): Unit = { val spark: SparkSession = SparkUtils.getLocalSparkSession() import spark.implicits._ val stuRDD: RDD[Stu] = spark.sparkContext.parallelize(List( Stu(1, "王东阳", "男", 18), Stu(2, "王西阳", "女", 19), Stu(3, "王北阳", "男", 20), Stu(4, "王南阳", "女", 21) )) val ds: Dataset[Stu] = stuRDD.toDS() // ds --> rdd val df: DataFrame = ds.toDF() df.show() SparkUtils.close(spark) } }
所谓数据加载,就是读取数据源的方式。
spark.read.format("data_format").load("data_path")
e.g.
val df: DataFrame = spark.read.format("json").load("C:\\ftp\\person.json")
spark.read.[data_format]("data_path")
e.g.
val df:DataFrame = spark.read.json("data_path")
val df:DataFrame = spark.read.csv("data_path")
val df:DataFrame = spark.read.text("data_path")
...
package com.qf.bigdata.spark.sql.day2 import com.qf.bigdata.spark.core.day3.SparkUtils import com.qf.bigdata.spark.core.day5.LoggerTrait import org.apache.spark.sql.{DataFrame, SparkSession} object Demo10_Read extends LoggerTrait{ def main(args: Array[String]): Unit = { val spark: SparkSession = SparkUtils.getLocalSparkSession() import spark.implicits._ //1. 读取文本文档 val textDF: DataFrame = spark.read.format("text").load("C:\\ftp\\ZZBIGDATA-2201\\day010-spark sql\\resource\\sql\\dailykey.txt") //2. 读取json文件 val jsonDF: DataFrame = spark.read.format("json").load("C:\\ftp\\ZZBIGDATA-2201\\day010-spark sql\\resource\\sql\\employees.json") //3. 读取parquet文件 val parquetDF: DataFrame = spark.read.parquet("C:\\ftp\\ZZBIGDATA-2201\\day010-spark sql\\resource\\sql\\users.parquet") parquetDF.show() SparkUtils.close(spark) } }
说白了就是将处理时候的数据保存在哪里。DataFrame/DataSet的数据从内存保存在某个持久化介质中。
df/ds.write.format("data_format").save("data_path")
e.g.
df.format("json").save("c:\\1.txt")
df/ds.write.json("c:\\1.txt")
package com.qf.bigdata.spark.sql.day2 import com.qf.bigdata.spark.core.day3.SparkUtils import com.qf.bigdata.spark.core.day5.LoggerTrait import org.apache.spark.sql.{DataFrame, SparkSession} object Demo10_Read extends LoggerTrait{ def main(args: Array[String]): Unit = { val spark: SparkSession = SparkUtils.getLocalSparkSession() import spark.implicits._ //1. 读取文本文档 val textDF: DataFrame = spark.read.format("text").load("C:\\ftp\\ZZBIGDATA-2201\\day010-spark sql\\resource\\sql\\dailykey.txt") //2. 读取json文件 val jsonDF: DataFrame = spark.read.format("json").load("C:\\ftp\\ZZBIGDATA-2201\\day010-spark sql\\resource\\sql\\employees.json") //3. 读取parquet文件 val parquetDF: DataFrame = spark.read.parquet("C:\\ftp\\ZZBIGDATA-2201\\day010-spark sql\\resource\\sql\\users.parquet") // parquetDF.show() //4. 读取csv文件 val csvDF: DataFrame = spark.read.format("csv").load("C:\\ftp\\ZZBIGDATA-2201\\day010-spark sql\\resource\\sql\\student.csv") .toDF("id", "name", "age", "gender", "subject", "score") csvDF.show() csvDF.write.json("c:\\ftp\\1") // spark.read.jdbc() SparkUtils.close(spark) } }
- 自己通过Spark SQL建立两张表
- teacher_basic
name/age/married/classes
- teacher_info
name/height/weight
- 将此二表进行join
create if not exists table teacher as select b.name, b.age, b.married, b.classes, i.height, i.weight from teacher_basic b left join teacher_info i on b.name = i.name
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 让spark sql程序在服务器端执行(打包上传:spark-submit脚本)
package com.qf.bigdata.spark.sql.day2 import com.qf.bigdata.spark.core.day3.SparkUtils import org.apache.spark.sql.SparkSession /** * SparkSQL整合Hive执行 */ object Demo11_SparkSQL_Hive { def main(args: Array[String]): Unit = { //1. 校验jar后面是否输入了两个参数 if (args == null || args.length != 2) { println( """ |Parameters error!!! | usage : xxx.jar <basic path> <info path> |""".stripMargin) System.exit(-1) // 程序异常退出 } val Array(basicPath, infoPath) = args //2. 获取到sparksession对象 val spark: SparkSession = SparkUtils.getSparkSession("yarn", "spark_hive", true) import spark.implicits._ //3. 建表 //3.1 建库 spark.sql( """ |create database if not exists spark_sql; |""".stripMargin) //3.2 切换数据库 spark.sql( """ |use spark_sql |""".stripMargin) //3.3 建表 // teacher_basic spark.sql( """ |create table if not exists spark_sql.teacher_basic( |name String, |age Int, |married String, |classes Int |) |row format delimited fields terminated by ',' |""".stripMargin) // teacher_info spark.sql( """ |create table if not exists spark_sql.teacher_info( |name String, |height Int, |weight Int |) |row format delimited fields terminated by ',' |""".stripMargin) //3.4 导入数据 // teacher_basic spark.sql( s""" |load data inpath '${basicPath}' into table spark_sql.teacher_basic |""".stripMargin) // teacher_info spark.sql( s""" |load data inpath '${infoPath}' into table spark_sql.teacher_info |""".stripMargin) //3.5 join查询并建立新表 spark.sql( """ |create table if not exists spark_sql.teacher |as |select |b.name, |b.age, |b.married, |b.classes, |i.height, |i.weight |from spark_sql.teacher_basic b left join spark_sql.teacher_info i |on b.name = i.name |""".stripMargin) //4. 释放资源 SparkUtils.close(spark) } }
##0. 将hive-site.xml/yarn-site.xml/core-site.xml拷贝到当前工程的resurces目录下 ##1. 打包 ##2. 启动hadoop集群 ##3. 启动hive集群 ##4. spark-submit [root@hadoop scrpits]# vim spark_hive.sh #!/bin/bash SPARK_HOME=/opt/apps/spark-3.1.2 HADOOP_CONF_DIR=/opt/apps/hadoop-3.2.1/etc/hadoop ${SPARK_HOME}/bin/spark-submit \ --master yarn \ --deploy-mode cluster \ --class com.qf.bigdata.spark.sql.day2.Demo11_SparkSQL_Hive \ --executor-memory 600M \ --executor-cores 1 \ --driver-cores 1 \ --driver-memory 600M \ /data/jar/spark.jar \ /input/teacher/teacher_basic.txt \ /input/teacher/teacher_info.txt ##5. 执行脚本 [root@hadoop scrpits]# sh spark_hive.sh ##6. 遇见问题: ##6.1 如果我们执行jar包可以成功,但是jar包中没有数据。我们检查,我们从hive客户端中建立数据库可以在hdfs中创建库目录,但是spark程序执行的时候创建的数据库,在hdfs中没有目录。 ##6.2 我们再次去检查mysql中的hive的库,发现spark程序建立的数据库它映射的目录不是hdfs,而是file:///的本地文件系统目录。 ##6.3 根据前两者我们推测,首先hive可以建立数据库到hdfs说明hive本身没有毛病;其次通过spark程序建立数据库建立在本地文件系统,说明spark不知道建立的库目录默认是应该放在hdfs中。由此我们得出结论,只要spark可以知道建立的库存放的hdfs的目录在哪里,那么我们就可以将我们的库存放在hdfs了。 ##7. 解决问题 ##7.1 修改hive-site.xml <property> <name>hive.metastore.warehouse.dir</name> <value>hdfs://hadoop:9000/user/hive/warehouse</value> </property> ##7.2 将hive-site.xml拷贝到$SPARK_HOME/conf目录下 ##7.3 重启hive的metastore的服务 ##7.4 重新打包并上传即可
package com.qf.bigdata.spark.sql.day3 import com.qf.bigdata.spark.core.day3.SparkUtils import com.qf.bigdata.spark.core.day5.LoggerTrait import org.apache.spark.sql.SparkSession object Demo1_Function extends LoggerTrait{ def main(args: Array[String]): Unit = { val spark: SparkSession = SparkUtils.getLocalSparkSession() import spark.implicits._ spark.sql( """ |select length('123') |""".stripMargin).show() SparkUtils.close(spark) } }
##1. 查看hive中支持的函数 hive (default)> show functions; OK tab_name ! != $sum0 % & * + - / < <= <=> <> = == > >= ^ abs acos add_months aes_decrypt ##2. desc命令查看函数的具体的使用 hive (default)> desc function date_add; OK tab_name date_add(start_date, num_days) - Returns the date that is num_days after start_date.
-- 1. round(x, [d]) : 对x保留d位小数,同时四舍五入 e.g. hive (default)> select round(0.24, 1) as res; OK res 0.2 -- 2. format_number(x, d) :对数字x进行格式化,四舍五入,整数部分用逗号隔离开来,小数部分保留d位 e.g. hive (default)> select format_number(12345.6789, 3); OK _c0 12,345.679 --3. floor(x) : 获取一个不大于x的整数 e.g. hive (default)> select floor(-3.4); OK _c0 -4 --4. ceil(x) : 获取一个不小于x的整数 e.g. hive (default)> select ceil(-3.4); OK _c0 -3 --5. rand() : 获取0~1的随机数 e.g. hive (default)> select rand(); OK _c0 0.26895202378592
--1. abs : 求绝对值
e.g.
hive (default)> select abs(-1);
OK
_c0
1
--2. pow(a, b) : 求a的b次幂
e.g.
hive (default)> select pow(2,3);
OK
_c0
8.0
1. if(expr1,expr2,expr3) : 如果expr1为true,那么就返回expr2,否则就返回expr3
e.g.
hive (spark_sql)> select name, age, if(married="true", '已婚', '未婚') from spark_sql.teacher;
2. case when : 多条件表达
e.g.
select age, case age when 24 then '少年' when 25 then '青年' when 26 then '壮年' else '老年' end from spark_sql.teacher;
1. current_date : 获取当前日期,日期格式:yyyy-MM-dd e.g. hive (spark_sql)> select current_date(); OK _c0 2022-05-30 2. current_timestamp :获取当前日期的时间戳,日期格式:yyyy-MM-dd HH:mm:ss.SSS e.g. hive (spark_sql)> select current_timestamp(); OK _c0 2022-05-30 16:42:24.516 3. add_months(start_date, num_months) : 返回start_date之后的num_months个月份的日期 e.g. hive (spark_sql)> select add_months(current_date(), 1); OK _c0 2022-06-30 4. date_add(start_date, num_days) : 返回start_date之后的num_days天数的日期 e.g. hive (spark_sql)> select date_add(current_date(), 1); OK _c0 2022-05-31 5. date_sub(start_date, num_days) : 返回start_date之前的num_days天数的日期 hive (spark_sql)> select date_sub(current_date(), 1); OK _c0 2022-05-29 6. next_day(start_date, day_of_week) : 返回start_date日志之后的下一个day_of_week的日期 e.g. hive (spark_sql)> select next_day(current_date, 'MON'); OK _c0 2022-06-06 7. dayofmonth(date) : 返回的是date对应的月份的第几天 e.g. hive (spark_sql)> select dayofmonth(current_date()); OK _c0 30 8. weekofyear(date) : 返回date对应的日期是一年中的第几周 e.g. hive (spark_sql)> select weekofyear(current_date()); OK _c0 22 9. minute/hour/day/month/year e.g. hive (spark_sql)> select minute(current_timestamp()); OK _c0 51 10. date_format(date, format) : 将date日期以指定的format格式化 e.g. hive (spark_sql)> select date_format(current_date, 'yyyy'); OK _c0 2022 11. datediff(date1, date2) : 求date1和date2的差集(单位:天) e.g. hive (spark_sql)> select datediff(current_date(), '2022-05-31'); OK _c0 -1 12. to_unix_timestamp(date) : 获取到date日期对应的unix的时间戳(去除毫秒) e.g. hive (spark_sql)> select to_unix_timestamp(current_date()); OK _c0 1653868800 13. from_unixtime(unix_time, format) : 将unix_time以指定的format转换为时间 e.g. hive (spark_sql)> select from_unixtime(to_unix_timestamp(current_date()), 'yyyyMMdd'); OK _c0 20220530 14. to_date(datetime) : 返回datetime的日期部分 e.g. hive (spark_sql)> select to_date(current_timestamp()); OK _c0 2022-05-30
1. length(str) : 求str的长度 2. instr(str, substr) : 相当于java中的indexof(substr) e.g. hive (spark_sql)> select instr('www.baidu.com', 'baidu'); OK _c0 5 3. substr/substring(str, pos[,len]) : 相当于java中的substring e.g. hive (spark_sql)> select substr('www.baidu.com', 5); OK _c0 baidu.com Time taken: 0.475 seconds, Fetched: 1 row(s) hive (spark_sql)> select substr('www.baidu.com', 5, 2); OK _c0 ba 4. substring_index(str, delim, count) : 将字符串str以delim进行分割,返回count个使用delim拼接的子串 e.g. hive (spark_sql)> select substring_index('www.baidu.com', ".", 3); OK _c0 www.baidu.com 5. concat(str1, str2,...) : 将str1和str2拼接 e.g. hive (spark_sql)> select concat("www", ".", "baidu", ".", "com"); OK _c0 www.baidu.com 6. concat_ws(sepator, str1, str2) : 使用指定的分隔符进行拼接 select concat_ws(".", "www","baidu","com");
1. sum/count/max/min/avg
当SparkSQL内置的函数不能满足我们的需求的时候,我们就可以自定义UDF函数来满足。
一路输入,一路输出。
package com.qf.bigdata.spark.sql.day4 import com.qf.bigdata.spark.core.day3.SparkUtils import com.qf.bigdata.spark.core.day5.LoggerTrait import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} /** * 自定义一个求字符串长度的udf函数 */ object Demo2 extends LoggerTrait{ def main(args: Array[String]): Unit = { val spark: SparkSession = SparkUtils.getLocalSparkSession() import spark.implicits._ //1. 原始数据 val rdd: RDD[String] = spark.sparkContext.parallelize(List( "fanchengcheng", "gaoyuanyuan", "libingbing", "yuanshanshan" )) //2. rdd -》df val df: DataFrame = rdd.toDF("name") // 3. 注册自定义的udf函数 spark.udf.register[Int, String]("myLen", myStringLength) //4. 创建虚表 df.createOrReplaceTempView("user") //5. 写sql spark.sql( """ |select |name, |length(name) as length, |myLen(name) as myLen |from user |""".stripMargin).show() SparkUtils.close(spark) } /** * 自定义的函数,作用是求str的字符串的长度 */ def myStringLength(str:String):Int = str.length }
多路输入,一路输出,模拟avg函数
package com.qf.bigdata.spark.sql.day4 import com.qf.bigdata.spark.core.day3.SparkUtils import com.qf.bigdata.spark.core.day5.LoggerTrait import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType} object Demo3_Spark2 extends LoggerTrait{ def main(args: Array[String]): Unit = { val spark: SparkSession = SparkUtils.getLocalSparkSession() import spark.implicits._ //1. 造数据 val rdd: RDD[Programmer] = spark.sparkContext.parallelize(List( Programmer(1, "老罗", 165.0, 180.0, 46), Programmer(2, "徐明伟", 180.0, 180.0, 26), Programmer(3, "毕帆", 175.0, 120.0, 26), Programmer(4, "陈小雨", 185.0, 280.0, 16), Programmer(5, "王东阳", 265.0, 280.0, 6) )) //2. 注册 spark.udf.register("myAvg", new MyAVG) val ds: Dataset[Programmer] = rdd.toDS() ds.createOrReplaceTempView("programmer") //3. 测试 spark.sql( """ |select |avg(height) as avg_height, |myAvg(height) as myAvg_height |from programmer |""".stripMargin).show() SparkUtils.close(spark) } } case class Programmer(id:Int, name:String, height:Double, weight:Double, age:Int) /** * 自定义UDAF函数类 */ class MyAVG extends UserDefinedAggregateFunction { /** * 自定义udaf函数的输入参数的类型 * myAvg(a:Double) * @return */ override def inputSchema: StructType = { StructType(List( StructField("a", DataTypes.DoubleType) )) } /** * udaf函数在求解的过程中的临时变量的类型 */ override def bufferSchema: StructType = { StructType(List( StructField("sum", DataTypes.DoubleType, false), StructField("count", DataTypes.IntegerType, false), )) } /** * udaf函数的返回值类型 */ override def dataType: DataType = DataTypes.DoubleType override def deterministic: Boolean = true /** * 初始化: * 说白了就是给sum和count赋初始值 * 分区内初始化 */ override def initialize(buffer: MutableAggregationBuffer): Unit = { buffer.update(0, 0.0) buffer.update(1, 0) } /** * 分区内的聚合 */ override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { buffer.update(0, buffer.getDouble(0) + input.getDouble(0)) // sum += row.height buffer.update(1, buffer.getInt(1) + 1) // count += 1 } /** * 全局聚合 */ override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1.update(0, buffer1.getDouble(0) + buffer2.getDouble(0)) // sum += row.height buffer1.update(1, buffer1.getInt(1) + buffer2.getInt(1)) // count += row.count } /** * 最终计算 */ override def evaluate(buffer: Row): Double = { buffer.getDouble(0) / buffer.getInt(1) // sum / count } }
package com.qf.bigdata.spark.sql.day4 import com.qf.bigdata.spark.core.day3.SparkUtils import com.qf.bigdata.spark.core.day5.LoggerTrait import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, Row, SparkSession, functions} import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType} object Demo4_Spark3 extends LoggerTrait{ def main(args: Array[String]): Unit = { val spark: SparkSession = SparkUtils.getLocalSparkSession() import spark.implicits._ //1. 原始数据 val rdd: RDD[String] = spark.sparkContext.parallelize(List( "fanchengcheng,22", "gaoyuanyuan,40", "libingbing,42", "yuanshanshan,32" )) val df: DataFrame = rdd.map(_.split(",")).map(arr => (arr(0), arr(1))).toDF("name", "age") df.createOrReplaceTempView("user") //2. 注册 spark.udf.register("myAvg", functions.udaf(MyAVG3)) //3. 测试 spark.sql( """ |select myAvg(age) as avg_age from user |""".stripMargin).show() SparkUtils.close(spark) } } case class Buffer(var sum:Long, var count:Long) object MyAVG3 extends Aggregator[Long, Buffer, Double] { override def zero: Buffer = Buffer(0L, 0L) override def reduce(buffer: Buffer, age: Long): Buffer = { buffer.sum += age // sum += age buffer.count += 1 // count += 1 buffer } override def merge(b1: Buffer, b2: Buffer): Buffer = { b1.sum += b2.sum b1.count += b2.count b1 } override def finish(reduction: Buffer): Double = reduction.sum.toDouble / reduction.count.toDouble override def bufferEncoder: Encoder[Buffer] = Encoders.product override def outputEncoder: Encoder[Double] = Encoders.scalaDouble }
SparkSQL的入口对象是SparkSession。但是在早期不是它,SqlContext是SparkSQL的入口。SqlContext.cacheTable(“tableName”)或者DataFrame/DataSet.cache()缓存在内存中。
因为在默认情况下,我们SparkSQL存储的表是行级的表。当我们将SparkSQL的表缓存到内存中之后,此时的表就变成了列式存储。当你查询列式的表的时候你只需要扫描特定的列即可。从而达到减少扫描量,最终提升性能。通过缓存数据,SparkSQL还可以自动调节压缩,从而达到最小化内存使用率和降低GC的压力。parquet:列式存储的格式。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-V7p86kPE-1653981175340)(005.png)]
groupby分组,我们学习过,使用此函数语法就会产生分组。采用wordcount为例
package com.qf.bigdata.spark.sql.day4 import com.qf.bigdata.spark.core.day3.SparkUtils import com.qf.bigdata.spark.core.day5.LoggerTrait import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} object Demo5 extends LoggerTrait{ def main(args: Array[String]): Unit = { val spark: SparkSession = SparkUtils.getLocalSparkSession() import spark.implicits._ val rdd: RDD[String] = spark.sparkContext.parallelize(List( "zhang zhang lixi lixi li li li xi xix ix xi", "zhang zhang lixi lixi li li li xi xix ix xi", "rock lee lee rock wang wang leee lee ya dong", "rocklee ha ha ha he he he xi hx xi" )) val df: DataFrame = rdd.toDF("line") df.createOrReplaceTempView("word") println("原始表 -----------------------------------") df.show() println("step1 ------ 进行数据拆分 --------------------") var sql = """ |select |split(line, '\\s+') |from word |""".stripMargin spark.sql(sql).show() println("step2 ------ 进行行转列 --------------------") sql = """ |select |explode(split(line, '\\s+')) word |from word |""".stripMargin spark.sql(sql).show() println("step3 ------ 对word字段添加前缀从而达到打散数据的效果 --------------------") sql = """ |select |concat_ws("_", cast(floor(rand() * 2) as string), t1.word) as prefix_word |from |( | select | explode(split(line, '\\s+')) word | from word |) t1 |""".stripMargin spark.sql(sql).show() println("step4 ------ 进行前缀的局部聚合 --------------------") sql = """ |select |concat_ws("_", cast(floor(rand() * 2) as string), t1.word) as prefix_word, |count(1) countz |from |( | select | explode(split(line, '\\s+')) word | from word |) t1 |group by prefix_word |""".stripMargin spark.sql(sql).show() println("step5 ------ 进行去前缀的操作 --------------------") sql = """ |select |t2.prefix_word, |substr(t2.prefix_word, instr(t2.prefix_word, '_') + 1) up_word, |t2.countz |from |( | select | concat_ws("_", cast(floor(rand() * 2) as string), t1.word) as prefix_word, | count(1) countz | from | ( | select | explode(split(line, '\\s+')) word | from word | ) t1 | group by prefix_word |) t2 |""".stripMargin spark.sql(sql).show() println("step6 ------ 进行全局聚合 --------------------") sql = """ |select |substr(t2.prefix_word, instr(t2.prefix_word, '_') + 1) up_word, |sum(t2.countz) counts |from |( | select | concat_ws("_", cast(floor(rand() * 2) as string), t1.word) as prefix_word, | count(1) countz | from | ( | select | explode(split(line, '\\s+')) word | from word | ) t1 | group by prefix_word |) t2 |group by up_word |""".stripMargin spark.sql(sql).show() SparkUtils.close(spark) } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。