赞
踩
1、要使用Spark SQL,首先需要创建一个SpakSession对象,可以通过SparkSession.builder().getOrCreate()方法获取
2、要是使用SpakSession创建DataFrame,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame。
代码示例 | 描述 |
sparkSession.read.json("D:\\bigdata/student.json") | 读取json格式的文本文件 |
sparkSession.read.csv("D:\\bigdata/student.json") | 读取csv格式的文本文件 |
sparkSession.read.text("D:\\bigdata/student.json") | 读取text格式的文本文件 |
sparkSession.read.parquet("D:\\bigdata/student.json") | 读取parquet格式的文本文件 |
代码:
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local")
- val sparkSession =
- SparkSession.builder().appName("SqlDemoScala").config(conf).getOrCreate()
- val stuDf = sparkSession.read.json("D:\\bigdata/student.json")
- //打印student数据
- stuDf.show()
- }
-
- val stuDf = sparkSession.read.json("D:\\bigdata/student.json")
- //默认显示所有数据,可以通过参数控制显示多少条
- stuDf.show(2)
- stuDf.printSchema();
代码:
- stuDf.filter(stuDf("age")>=19).show();
- stuDf.groupBy("age").count().show();
- stuDf.sort(stuDf("age").desc).show()
-
结果:
排序也可以如下写法,但想要添加隐式转换函数,否则语法报错:
import sparkSession.implicits._
stuDf.filter($"age">=19).show()
DataFrame可以将它看成一个关系型数据表,然后在程序中使用spark.sql()的方式执行sq查看,使用sql风格的前提是要将dataFrame注册成一个临时表
- //将DataFrame注册一个临时表
- stuDf.createOrReplaceTempView("student")
- //使用sql查询临时表中的数据
- sparkSession.sql("select age,count(*) as num from student group by age")
- .show()
- sparkSession.stop()
-
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。