赞
踩
Mysql+Hive:1、Centos7 MySQL安装 —— 用网盘简单安装
2、Hadoop集群搭建及配置⑨——Hive 可靠的安装配置
3、Spark SQ操作 MySQL数据库和 Hive数据仓库
4、Spark SQL RDD基本操作、RDD—DataFrame、API MySQL
5、Spark SQL RDD、DataFrame、Dataset、反射推断机制 Schema 操作!!
最近要写实训报告,顺便把内容分享下。
在使用IDEA时要添加依赖包这里一口气把 hadoop、hbase、scala、spark的依赖添加下。
<properties> <scala.version>2.11.8</scala.version> <hadoop.version>2.7.3</hadoop.version> <spark.version>2.4.0</spark.version> <hbase.version>1.2.4</hbase.version> </properties> <dependencies> <!--Scala--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!--Spark--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.11</artifactId> <version>${spark.version}</version> </dependency> <!--Hadoop--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!--Hbase--> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.46</version> </dependency> </dependencies> </project>
8、Hadoop集群搭建及配置⑥ —— Hadoop组件安装及配置
9、Hadoop集群搭建及配置⑦—— Spark&Scala安装配置
vi test.json
{"id":1,"name":"Ella","age":36}
{"id":2,"name":"Bob","age":29}
{"id":3,"name":"Jack","age":29}
{"id":4,"name":"Jim","age":28}
{"id":4,"name":"Jim","age":28}
{"id":5,"name":"Damon"}
{"id":5,"name":"Damon"}
将上面 json数据保存为test.json。并上传到hdfs的/spark/test.json。
# 1.2上传到hdfs的/spark/test.json
hadoop fs -put test.json /spark/test.json
# master 节点启动 spark
/usr/spark/spark-2.4.0-bin-hadoop2.7/sbin/start-all.sh
# slave 节点开启
/usr/spark/spark-2.4.0-bin-hadoop2.7/sbin/start-master.sh
# ~ 进入Scala 交互式,输入:~
spark-shell --master local[2]
# 0.读取文件 val df = spark.read.json("/spark/test.json") # 1.查询所有数据; df.show() # 2.查询所有数据,并去除重复的数据; df.distinct().show() # 3.查询所有数据,并去除重复的数据 df.drop("id").show # 4.筛选出 age>30 的记录 df.filter(df("age")>30).show() # 5.将数据按age分组 df.groupBy("age").count().show() # 6.将数据按name升序排列 df.sort(df("name").asc).show() # 7.取出前3行数据; df.take(3) # 8.查询所有记录的name列,并为其取别名为username; df.select(df("name").as("username")).show() # 9.查询年龄age的平均值; df.agg("age"->"avg").show() # 10.查询年龄age的最小值。 df.agg("age"->"min").show()
创建本地文件 test.txt
1,Ella,36
2,Bob,29
3,Jack,29
import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} object testDF { def main(args: Array[String]): Unit = { // 1.创建 SparkSession val spark: SparkSession = SparkSession.builder().appName("testDF").master("local[2]").getOrCreate() // 2.获取 sparkContext对象 val sc: SparkContext = spark.sparkContext // 设置日志打印级别 sc.setLogLevel("WARN") // 3.加载数据 val dataRDD: RDD[String] = sc.textFile("D:\\test.txt") // 4.切分每一行 val dataArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(",")) // 5.加载数据到 Row对象中 val personRDD: RDD[Row] = dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt)) // 6.创建 Schema val schema:StructType= StructType(Seq( StructField("id", IntegerType, false), StructField("name", StringType, false), StructField("age", IntegerType, false) )) // 7.利用 personRDD与Schema创建DataFrame val personDF: DataFrame = spark.createDataFrame(personRDD,schema) // 8.DSL操作显示DataFrame的数据结果 personDF.show() // 9.将DataFrame注册成表 personDF.createOrReplaceTempView("t_test") // 10.sql语句操作 spark.sql("select * from t_test").show() // 11.关闭资源 sc.stop() spark.stop() } }
在MySQL数据库中新建数据库sparktest,再创建表employee,包含如下所示的两行数据。
id name gender Age
1 Alice F 22
2 John M 25
# 查看数据库
show databases;
# 1、创建数据仓库 sparktest
create database sparktest;
# 2.进入 sparktest数据库
use sparktest;
# 3.创建数据表 employee
create table if not exists sparktest.employee(id int(4),name char(20),gender char(4),age int(4))
# 4.插入数据
insert into employee values(1,"Alice","F",22);
insert into employee values(2,"John","M",25);
# 5.显示数据表数据
select * from employee;
配置Spark通过 JDBC连接数据库 MySQL,编程实现利用 DataFrame插入如下所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。
id name gender age
3 Mary F 26
4 Tom M 23
编程代码:
import java.util.Properties import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession} // 0. 创建 employee样例类 case class employee(id:Int,name:String,gender:String,age:Int) object SparkToMysql { def main(args: Array[String]): Unit = { // 1、创建 SparkSession对象 val spark:SparkSession = SparkSession.builder() .appName("SparkToMysql") .master("local[2]") .getOrCreate() //设置日志打印级别 spark.sparkContext.setLogLevel("WARN") /* * spark.sparkContext.parallelize()方法创建一个RDD,RDD有 person两个数据; * 用","逗号 作为样例类字段匹配的依据,且转换为DataFrame对象; */ // 2、创建插入数据 val data = spark.sparkContext.parallelize(Array("3,Mary,F,26","4,Tom,M,23")) // 3、按列名切分 val arrRDD:RDD[Array[String]] = data.map(_.split(",")) // 4、RDD关联 employee类 val employee:RDD[employee] = arrRDD.map(x=>employee(x(0).toInt,x(1),x(2),x(3).toInt)) // 导入隐式转换 import spark.implicits._ // 5、将 RDD转换成 DataFrame val employeeDF: DataFrame = employeeRDD.toDF() // 6.创建 Properties对象,设置连接MySQL的用户名和密码 val pro:Properties = new Properties() pro.setProperty("user","root") pro.setProperty("password","abc123456") pro.setProperty("driver","com.mysql.jdbc.Driver") // 7、写入数据 /* * employeeDF.write.mode() 方法表示设置写入数据方式, * append表示追加数据,overwrite表示覆盖数据; * errorIfExists表示表存在就报错,ignore 表示忽略新保存的数据. * */ employeeDF.write.mode("append").jdbc("jdbc:mysql://master:3306/sparktest", "sparktest.employee", pro) employeeDF.agg("age" -> "max", "age" -> "sum").show() employeeDF.show() spark.stop() } }
【注】:以上代码与下面一篇博客 有许多相似之处。
Spark SQ操作 MySQL数据库和 Hive数据仓库
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。