赞
踩
在mysql数据库中有员工表和部门表 表结构以及数据如下:
create table dept(
deptno int ,
dname varchar(14) ,
loc varchar(13)
) ;
create table emp(
eno int ,
ename varchar(10),
job varchar(9),
mgr int,
hirdate date,
sal int,
comm int,
deptno int not null
);
INSERT INTO dept VALUES(10,‘ACCOUNTING’,‘NEW YORK’);
INSERT INTO dept VALUES(20,‘RESEARCH’,‘DALLAS’);
INSERT INTO dept VALUES(30,‘SALES’,‘CHICAGO’);
INSERT INTO dept VALUES(40,‘OPERATIONS’,‘BOSTON’);
INSERT INTO emp VALUES(7369,‘SMITH’,‘CLERK’,7902,‘1980-12-17’,800,NULL,20);
INSERT INTO emp VALUES(7499,‘ALLEN’,‘SALESMAN’,7698,‘1981-02-20’,1600,300,30);
INSERT INTO emp VALUES(7521,‘WARD’,‘SALESMAN’,7698,‘1981-02-22’,1250,500,30);
INSERT INTO emp VALUES(7566,‘JONES’,‘MANAGER’,7839,‘1981-04-02’,2975,NULL,20);
INSERT INTO emp VALUES(7654,‘MARTIN’,‘SALESMAN’,7698,‘1981-09-28’,1250,1400,30);
INSERT INTO emp VALUES(7698,‘BLAKE’,‘MANAGER’,7839,‘1981-05-01’,2850,NULL,30);
INSERT INTO emp VALUES(7782,‘CLARK’,‘MANAGER’,7839,‘1981-06-09’,2450,NULL,10);
INSERT INTO emp VALUES(7788,‘SCOTT’,‘ANALYST’,7566,‘1987-06-13’,3000,NULL,20);
INSERT INTO emp VALUES(7839,‘KING’,‘PRESIDENT’,NULL,‘1981-11-17’,5000,NULL,10);
INSERT INTO emp VALUES(7844,‘TURNER’,‘SALESMAN’,7698,‘1981-09-08’,1500,0,30);
INSERT INTO emp VALUES(7876,‘ADAMS’,‘CLERK’,7788,‘1987-06-13’,1100,NULL,20);
INSERT INTO emp VALUES(7900,‘JAMES’,‘CLERK’,7698,‘1981-12-03’,950,NULL,30);
INSERT INTO emp VALUES(7902,‘FORD’,‘ANALYST’,7566,‘1981-12-03’,3000,NULL,20);
INSERT INTO emp VALUES(7934,‘MILLER’,‘CLERK’,7782,‘1983-01-23’,1300,NULL,10);
请用 spark sql 分析mysql中上面表的数据,实现以下的需求:
1>每个部门有多少员工
2>列出最低薪金大于1500的各种工作
3>列出薪金高于公司平均薪金的所有员工
<properties> <spark_version>2.3.1</spark_version> <!-- elasticsearch--> <elasticsearch.version>5.5.2</elasticsearch.version> <fastjson.version>1.2.28</fastjson.version> <elasticsearch-hadoop.version>6.3.2</elasticsearch-hadoop.version> <elasticsearch-spark.version>5.5.2</elasticsearch-spark.version> <maven.version>3.5.1</maven.version> </properties> <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark_version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark_version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-yarn --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-yarn_2.11</artifactId> <version>${spark_version}</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>${elasticsearch-spark.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.46</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>${maven.version}</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
package com.xu import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object SparkSqlTest { def main(args: Array[String]): Unit = { //创建一个session,在2.0以后,SparkContext不是必须创建的,而是可以通过如下方式创建 val conf=new SparkConf().setMaster("local").setAppName("SparkReadMysql") val sparkSession=SparkSession .builder() .config(conf)//设置操作hive的url,相当于jdbc里的url .getOrCreate() val prop=scala.collection.mutable.Map[String,String]() prop.put("user","root") prop.put("password","123456") prop.put("driver","com.mysql.jdbc.Driver") //每个部门有多少员工 prop.put("dbtable","(select de.dname, em.cou from (select deptno,count(*) cou from emp where deptno group by deptno) em, dept de where em.deptno = de.deptno) as sql1") //列出最低薪金大于1500的各种工作 prop.put("dbtable", "(select job,min(sal) from emp group by job having min(sal)>1500) sql2") //列出薪金高于公司平均薪金的所有员工 prop.put("dbtable","(select *from emp where sal > (select avg(sal) from emp)) as haha") prop.put("url","jdbc:mysql://node03:3306/sparkdb?characterEncoding=utf8&serverTimezone=UTC") //从数据库中加载整个表的数据 val df=sparkSession.read.format("jdbc").options(prop).load() //读出来之后注册为临时表 df.createOrReplaceTempView("emp") df.show() //1. 查询emp表中所有数据 //sparkSession.sql("select * from emp").show() //2. 列出薪金高于公司平均薪金的所有员工 //sparkSession.sql("").show() sparkSession.close() } }
SparkSQL 中并没有直接提供按照 SQL 进行筛选读取数据的 API 和参数, 但是可以通过 dbtable 来曲线救国, dbtable 指定目标表的名称, 但是因为 dbtable 中可以编写 SQL, 所以使用子查询即可做到
注意: 上面的dbtable,只会执行最后一个
1>每个部门有多少员工
2>列出最低薪金大于1500的各种工作
3>列出薪金高于公司平均薪金的所有员工
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。