当前位置:   article > 正文

Spark SQL RDD基本操作、RDD—DataFrame、API MySQL_(2)配置spark通过jdbc连接数据库mysql,编程实现利用dataframe插入至少两行个性

(2)配置spark通过jdbc连接数据库mysql,编程实现利用dataframe插入至少两行个性化


Mysql+Hive:1、Centos7 MySQL安装 —— 用网盘简单安装

2、Hadoop集群搭建及配置⑨——Hive 可靠的安装配置

3、Spark SQ操作 MySQL数据库和 Hive数据仓库

4、Spark SQL RDD基本操作、RDD—DataFrame、API MySQL

5、Spark SQL RDD、DataFrame、Dataset、反射推断机制 Schema 操作!!


最近要写实训报告,顺便把内容分享下。
在使用IDEA时要添加依赖包这里一口气把 hadoop、hbase、scala、spark的依赖添加下。

 <properties>
        <scala.version>2.11.8</scala.version>
        <hadoop.version>2.7.3</hadoop.version>
        <spark.version>2.4.0</spark.version>
        <hbase.version>1.2.4</hbase.version>
    </properties>

    <dependencies>
        <!--Scala-->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!--Spark-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!--Hadoop-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <!--Hbase-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.46</version>
        </dependency>
    </dependencies>
</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

8、Hadoop集群搭建及配置⑥ —— Hadoop组件安装及配置

9、Hadoop集群搭建及配置⑦—— Spark&Scala安装配置

10、Hadoop集群搭建及配置⑧——Hbase的安装配置

11、eclipse配置连接Hadoop

12、eclipse 实现 Hdfs java API

13、eclipse 实现 HBase java API

14、Hbase java API 实现增删改查


  • 目 的:
  1. 通过实验掌握Spark SQL的基本编程方法;
  2. 熟悉RDD到DataFrame的转化方法;
  3. 熟悉利用Spark SQL管理来自不同数据源的数据。

一、Spark SQL基本操作

1.1 创建 test.json

vi test.json

{"id":1,"name":"Ella","age":36}
{"id":2,"name":"Bob","age":29}
{"id":3,"name":"Jack","age":29}
{"id":4,"name":"Jim","age":28}
{"id":4,"name":"Jim","age":28}
{"id":5,"name":"Damon"}
{"id":5,"name":"Damon"}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1.2 test.json 上传 Hdfs

将上面 json数据保存为test.json。并上传到hdfs的/spark/test.json。

# 1.2上传到hdfs的/spark/test.json
hadoop fs -put test.json /spark/test.json
  • 1
  • 2

1.3 进入shell交互式

# master 节点启动 spark
/usr/spark/spark-2.4.0-bin-hadoop2.7/sbin/start-all.sh

# slave 节点开启
/usr/spark/spark-2.4.0-bin-hadoop2.7/sbin/start-master.sh

# ~ 进入Scala 交互式,输入:~
spark-shell --master local[2]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
# 0.读取文件
val df = spark.read.json("/spark/test.json")
# 1.查询所有数据;
df.show()
# 2.查询所有数据,并去除重复的数据;
df.distinct().show()
# 3.查询所有数据,并去除重复的数据
df.drop("id").show
# 4.筛选出 age>30 的记录
df.filter(df("age")>30).show()
# 5.将数据按age分组
df.groupBy("age").count().show()
# 6.将数据按name升序排列
df.sort(df("name").asc).show()
# 7.取出前3行数据; 
df.take(3)
# 8.查询所有记录的name列,并为其取别名为username; 
df.select(df("name").as("username")).show()
# 9.查询年龄age的平均值;
df.agg("age"->"avg").show()
# 10.查询年龄age的最小值。
df.agg("age"->"min").show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


二、编程实现将 RDD转换为DataFrame

2.1 创建文件 test.txt

创建本地文件 test.txt

1,Ella,36
2,Bob,29
3,Jack,29
  • 1
  • 2
  • 3

2.2 编写代码

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object testDF {
  def main(args: Array[String]): Unit = {
    // 1.创建 SparkSession
    val spark: SparkSession = SparkSession.builder().appName("testDF").master("local[2]").getOrCreate()
    // 2.获取 sparkContext对象
    val sc: SparkContext = spark.sparkContext
    // 设置日志打印级别
    sc.setLogLevel("WARN")
    // 3.加载数据
    val dataRDD: RDD[String] = sc.textFile("D:\\test.txt")
    // 4.切分每一行
    val dataArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(","))
    // 5.加载数据到 Row对象中
    val personRDD: RDD[Row] = dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))
    // 6.创建 Schema
    val schema:StructType= StructType(Seq(
      StructField("id", IntegerType, false),
      StructField("name", StringType, false),
      StructField("age", IntegerType, false)
    ))
    // 7.利用 personRDD与Schema创建DataFrame
    val personDF: DataFrame = spark.createDataFrame(personRDD,schema)
    // 8.DSL操作显示DataFrame的数据结果
    personDF.show()
    // 9.将DataFrame注册成表
    personDF.createOrReplaceTempView("t_test")
    // 10.sql语句操作
    spark.sql("select * from t_test").show()
    // 11.关闭资源
    sc.stop()
    spark.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

在这里插入图片描述


三、 编程实现利用DataFrame读写MySQL的数据

3.1 MySQL创建与操作 sparktest

在MySQL数据库中新建数据库sparktest,再创建表employee,包含如下所示的两行数据。

id 	name	gender 	Age 
1 	Alice 	F 		22 
2 	John 	M 		25 
  • 1
  • 2
  • 3
# 查看数据库
show databases;
# 1、创建数据仓库 sparktest
create database sparktest;
# 2.进入 sparktest数据库
use sparktest;
# 3.创建数据表 employee
create table if not exists sparktest.employee(id int(4),name char(20),gender char(4),age int(4))
# 4.插入数据
insert into employee values(1,"Alice","F",22);
insert into employee values(2,"John","M",25);
# 5.显示数据表数据
select * from employee;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

在这里插入图片描述

3.2 Spark API 操作 MySQL

配置Spark通过 JDBC连接数据库 MySQL,编程实现利用 DataFrame插入如下所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。

id 	name 	gender 	age 
3 	Mary 	F 		26 
4 	Tom 	M 		23 
  • 1
  • 2
  • 3

编程代码:

import java.util.Properties
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
// 0. 创建 employee样例类
case class employee(id:Int,name:String,gender:String,age:Int)
object SparkToMysql {
  def main(args: Array[String]): Unit = {
    // 1、创建 SparkSession对象
    val spark:SparkSession = SparkSession.builder()
      .appName("SparkToMysql")
      .master("local[2]")
      .getOrCreate()
    //设置日志打印级别
    spark.sparkContext.setLogLevel("WARN")
    /*
    * spark.sparkContext.parallelize()方法创建一个RDD,RDD有 person两个数据;
    * 用","逗号 作为样例类字段匹配的依据,且转换为DataFrame对象;
    */
    // 2、创建插入数据
    val data = spark.sparkContext.parallelize(Array("3,Mary,F,26","4,Tom,M,23"))
    // 3、按列名切分
    val arrRDD:RDD[Array[String]] = data.map(_.split(","))
    // 4、RDD关联 employee类
    val employee:RDD[employee] = arrRDD.map(x=>employee(x(0).toInt,x(1),x(2),x(3).toInt))

    // 导入隐式转换
    import spark.implicits._
    // 5、将 RDD转换成 DataFrame
    val employeeDF: DataFrame = employeeRDD.toDF()
    // 6.创建 Properties对象,设置连接MySQL的用户名和密码
    val pro:Properties = new Properties()
    pro.setProperty("user","root")
    pro.setProperty("password","abc123456")
    pro.setProperty("driver","com.mysql.jdbc.Driver")
    // 7、写入数据
    /*
    * employeeDF.write.mode() 方法表示设置写入数据方式,
    * append表示追加数据,overwrite表示覆盖数据;
    * errorIfExists表示表存在就报错,ignore 表示忽略新保存的数据.
    * */
    employeeDF.write.mode("append").jdbc("jdbc:mysql://master:3306/sparktest", "sparktest.employee", pro)
    employeeDF.agg("age" -> "max", "age" -> "sum").show()
    employeeDF.show()
    spark.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

在这里插入图片描述
【注】:以上代码与下面一篇博客 有许多相似之处。
Spark SQ操作 MySQL数据库和 Hive数据仓库

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/663253
推荐阅读
相关标签
  

闽ICP备14008679号