当前位置:   article > 正文

Spark SQL的使用_spark sql基本操作

spark sql基本操作

目录

一、Spark SQL

1、spark sql的特点

2、spark sql概述

3、SparkSQL核心编程

新的起点

DataFrame

二、部分sql语句的使用

1、where语句

 2、case语句

 3、rlike方法

4、select方法

 三、Spark SQL 读取MySQL数据库

1、在Centos7安装MySQL5.7

2、写一个sql文件,导入到MySQL数据库中

2、写一个Spark SQL程序读取表 

 3、程序运行可能出现的错误


一、Spark SQL

1、spark sql的特点

  • 引入SchemaRDD,使得RDD带有Schema模式信息。Spark1.3以后,SchemaRDD被DataFrame替换,DataFrame提供更多易于使用的API

  • 可以执行SQL语句

  • 支持多种数据源。RDD、Hive、HDFS、Cassandra等

  • 支持JSON、Parquet、CSV等格式数据

  • 支持Scala、Java、Python等语言

2、spark sql概述

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet,并且作为分布式SQL查询引擎的作用。
它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduc的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!

3、SparkSQL核心编程

  • 新的起点

        SparkCore中,如果想要执行应用程序,首先需要构建上下文环境对象SparkContext。SparkSQL可以理解为是对SparkCore的封装。不仅是在模型上进行了封装,上下文环境对象也进行了封装。在老的版本中,SparkSQL提供了两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。

        SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。Spark Session 内部封装了 SparkContext,所以计算实际上是由 sparkContext 完成的。当我们使用 spark-shell 的时候, spark 框架会自动的创建一个名称叫做 spark 的SparkSession 对 象, 就像我们以前可以自动获取到一个 sc 来表示 SparkContext 对象一样。

  • DataFrame

        在 Spark SQL 中 SparkSession 是创建 DataFrame 和执行 SQL 的入口,创建 DataFrame有三种方式:通过 Spark 的数据源进行创建;从一个存在的 RDD 进行转换;还可以从 Hive Table 进行查询返回。

二、部分sql语句的使用

1、where语句

筛选星期二的数据

  1. public static void main(String[] args) {
  2. SparkSession ss = SparkSession.builder()
  3. //设置任务名称
  4. .appName("app2")
  5. .master("local")
  6. .config("spark.driver.host", "localhost")
  7. //设置master
  8. .getOrCreate();
  9. Dataset<Row> weather=ss.read().json("D:\\spark\\spark期末作业\\weather.json");
  10. //筛选出星期二的数据
  11. Dataset<Row> whereRes =weather.where("week='星期二'");
  12. whereRes.show();
  13. }

 2、case语句

将最高温度分为热、温度适中、温度偏冷、冷四个阶段,并统计其数据总量

  1. public static void main(String[] args) {
  2. SparkSession ss = SparkSession.builder()
  3. //设置任务名称
  4. .appName("app1")
  5. .master("local")
  6. .config("spark.driver.host", "localhost")
  7. //设置master
  8. .getOrCreate();
  9. Dataset<Row> srcRdd=ss.read().json("D:\\spark\\spark期末作业\\weather.json");
  10. Dataset<Row> time=srcRdd.selectExpr("*","case when max_temperature>30 then '热'"
  11. + "when max_temperature>=20 and max_temperature<30 then '温度适中'"
  12. + "when max_temperature>=10 and max_temperature<20 then '温度偏冷'"
  13. + "else '冷' end as `温度`");
  14. time.groupBy("温度").count().show();
  15. ss.stop();
  16. }

 3、rlike方法

从student.json文件中用rlike正则匹配出名字是A开头的所有学生

  1. public static void main(String[] args) {
  2. SparkSession ss = SparkSession.builder()
  3. //设置任务名称
  4. .appName("app1")
  5. .master("local")
  6. .config("spark.driver.host", "localhost")
  7. //设置master
  8. .getOrCreate();
  9. Dataset<Row> student=ss.read().json("D:\\spark.fcx\\student.json");
  10. student.where("name like 'A%'").show();
  11. }

4、select方法

select方法的作用是选择特定列生成新的Dataset<Row>,

select(“name”, “age”)表示将name和age这两列数据取出,生成新的Dataset<Row>。

  1. public static void main(String[] args) {
  2. SparkSession ss = SparkSession.builder()
  3. //设置任务名称
  4. .appName("app1")
  5. .master("local")
  6. .config("spark.driver.host", "localhost")
  7. //设置master
  8. .getOrCreate();
  9. Dataset<Row> student=ss.read().json("D:\\spark.fcx\\student.json");
  10. student.select("name","age").show();
  11. }

 三、Spark SQL 读取MySQL数据库

1、在Centos7安装MySQL5.7

  • 安装mysql repo: rpm -ivh http://repo.mysql.com/mysql57-community-release-el7-8.noarch.rpm
  • 在线安装mysql:
    yum install -y mysql-community-client mysql-community-devel mysql-community-server --nogpgcheck
  • 启动MySQL的mysqld进程(该进程是MySQL的服务端进程):systemctl start mysqld
  • 查看MySQL服务端是否启动成功:systemctl status mysqld
  • 查看MySQL的root帐号默认密码:
  • cat /var/log/mysqld.log | grep "password"
  • 用mysql这个命令(即mysql这个命令是MySQL系统的一个客户端),连接MySQL服务端:mysql -u root -p密码
  • 创建xx数据库:
  • create database xx DEFAULT CHARACTER SET utf8

2、写一个sql文件,导入到MySQL数据库中

  1. /*
  2. SQLyog Community v13.1.6 (64 bit)
  3. MySQL - 5.7.43 : Database - Weather
  4. *********************************************************************
  5. */
  6. /*!40101 SET NAMES utf8 */;
  7. /*!40101 SET SQL_MODE=''*/;
  8. /*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
  9. /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
  10. /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
  11. /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
  12. CREATE DATABASE /*!32312 IF NOT EXISTS*/`Weather` /*!40100 DEFAULT CHARACTER SET utf8 */;
  13. USE `Weather`;
  14. /*Table structure for table `xx` */
  15. DROP TABLE IF EXISTS `xx`;
  16. CREATE TABLE `xx` (
  17. `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增ID,主键',
  18. `date` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '日期',
  19. `max_temperature` bigint(20) NOT NULL COMMENT '最高温度',
  20. `min_temperature` bigint(20) NOT NULL COMMENT '最低温度',
  21. `weather` varchar(32) NOT NULL COMMENT '天气',
  22. `week` varchar(32) NOT NULL COMMENT '星期几',
  23. `wind_direction` varchar(32) NOT NULL COMMENT '风向',
  24. `wind_scale` varchar(32) NOT NULL COMMENT '风级',
  25. PRIMARY KEY (`id`)
  26. ) ENGINE=InnoDB AUTO_INCREMENT=8 DEFAULT CHARSET=utf8;
  27. /*Data for the table `xx` */
  28. insert into `xx`(`id`,`date`,`week`,`weather`,`min_temperature`,`max_temperature`,`wind_direction`,`wind_scale`) values
  29. (1, '2019/1/1','星期二 ','晴',1, '6','西北风','2级'),
  30. (2,'2019/1/2',' 星期三 ', '阴','1', '6','西北风','2级'),
  31. (3,'2019/1/3',' 星期四 ', '晴','3', '5','北风','2级'),
  32. (4,'2019/1/4',' 星期五 ', '多云','2', '9','西北风','2级'),
  33. (5,'2019/1/5',' 星期六 ', '小雨','3', '9','西北风','2级'),
  34. (6,'2019/1/6',' 星期日 ', '晴','4', '10','西北风','2级'),
  35. (7,'2019/1/7',' 星期一 ', '晴','4', '11','西北风','2级'),
  36. (8,'2019/1/8',' 星期二 ', '多云','4', '9','北风','2级'),
  37. (9,'2019/1/9',' 星期三 ', '多云','4', '7','北风','2级'),
  38. (10,'2019/1/10',' 星期四 ', '晴','4', '8','北风','2级'),
  39. (11,'2019/1/11',' 星期五 ', '晴','3', '11','南风','1级'),
  40. (12,'2019/1/12',' 星期六 ', '晴','3', '10','西北风','2级'),
  41. (13,'2019/1/13',' 星期日 ', '晴','3', '11','西北风','1级'),
  42. (14,'2019/1/14',' 星期一 ', '晴','4', '12','北风','2级'),
  43. (15,'2019/1/15',' 星期二 ', '晴','4', '10','东北风','2级'),
  44. (16,'2019/1/16',' 星期三 ', '阴','2', '10','北风','2级'),
  45. (17,'2019/1/17',' 星期四 ', '阴','3', '8','西北风','2级'),
  46. (18,'2019/1/18',' 星期五 ', '晴','5', '10','西北风','2级'),
  47. (19,'2019/1/19',' 星期六 ', '晴','3', '12','北风','2级'),
  48. (20,'2019/1/20',' 星期日 ', '晴','4', '12','东北风','2级'),
  49. (21,'2019/1/21',' 星期一 ', '晴','2', '12','西风','2级'),
  50. (22,'2019/1/22',' 星期二 ', '晴','1', '12','西北风','1级'),
  51. (23,'2019/1/23',' 星期三 ', '晴','1', '12','西北风','2级'),
  52. (24,'2019/1/24',' 星期四 ', '多云','5', '11','北风','2级'),
  53. (25,'2019/1/25',' 星期五 ', '多云','6', '13','西北风','2级'),
  54. (26,'2019/1/26',' 星期六 ', '小雨','6', '9','北风','2级'),
  55. (27,'2019/1/27',' 星期日 ', '多云','7', '9','北风','2级'),
  56. (28,'2019/1/28',' 星期一 ', '多云','6', '10','西北风','2级'),
  57. (29,'2019/1/29',' 星期二 ', '阴','6', '12','北风','2级'),
  58. (30,'2019/1/30',' 星期三 ', '晴','5', '11','北风','2级');
  59. /*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
  60. /*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
  61. /*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
  62. /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

3、写一个Spark SQL程序读取表 

  1. public static void main(String[] args) throws AnalysisException {
  2. SparkSession ss = SparkSession.builder()
  3. //设置任务名称
  4. .appName("app1")
  5. .master("local")
  6. .config("com.mysql.jdbc.Driver","driver")
  7. //设置master
  8. .getOrCreate();
  9. Properties prop = new Properties();
  10. //账号密码
  11. prop.setProperty("user", "root");
  12. prop.setProperty("password", "Aa123-456");
  13. Dataset<Row>
  14. //jdbc:url xx是连接的表
  15. weather=ss.read().jdbc("jdbc:mysql://192.168.58.100:3306/Weather","xx",prop);
  16. weather.show();
  17. }

 4、程序运行可能出现的错误

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/589141
推荐阅读
相关标签
  

闽ICP备14008679号