当前位置:   article > 正文

Sprak编程: 编程实现利用DataFrame读写MySQL的数据 2020.11.01_配置spark通过jdbc连接数据库mysql,编程实现利用dataframe插入下列数据到mysq

配置spark通过jdbc连接数据库mysql,编程实现利用dataframe插入下列数据到mysql,打

编程实现利用 DataFrame 读写 MySQL 的数据

(1)在 MySQL 数据库中新建数据库 sparktest,再创建表 employee,包含如表 6-2 所示的两行数据。

表 6-2 employee 表原有数据
在这里插入图片描述

(2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入如表6-3 所示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。

表 6-3 employee 表新增数据
在这里插入图片描述

1. 登录MySql进行创建库表

新建数据库sparktest,新建数据表employee,并输入数据:

create database sparktest;
use spraktest;
create table employee(id int(4),name char(50), gender char(20), age int(10));
insert into employee values(1,'Alice','F',22);
insert into employee values(2,'John','M',25);
  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述

2. 启动spark-shell,并指定mysql连接驱动jar包

(如果你前面已经采用下面方式启动了spark-shell,就不需要重复启动了)
进入spark时写附加命令(前提是记得建所需要的包进行解压到spark的jars目录下)

./bin/spark-shell  \
 --jars /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar \
 --driver-class-path /usr/local/spark/jars/mysql-connector-java-5.1.40/mysql-connector-java-5.1.40-bin.jar
  • 1
  • 2
  • 3

开启 Mysql 服务器

service mysql start
  • 1

在这里插入图片描述

后面可能会遇到的问题

以下问题可能是 没有开始服务器 或者 密码错误
在这里插入图片描述

3. 进入spark进行如下编程

testSql.scala
	import java.util.Properties
	import org.apache.spark.sql.types._
	import org.apache.spark.sql.Row
	import org.apache.spark.sql.SparkSession
	
    val spark=SparkSession.builder().appName("TestMySQL").master("local").getOrCreate()
    import  spark.implicits._
    val employeeRDD=spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))
    val schema=StructType(List(StructField("id",IntegerType,
      true),StructField("name",StringType,true),StructField("gender",StringType,true),
      StructField("age",IntegerType,true)))
    val rowRDD=employeeRDD.map(p=>Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))
    val employeeDF=spark.createDataFrame(rowRDD,schema)



    val prop=new Properties()
    prop.put("user","root")
    prop.put("password","210374520")
    prop.put("driver","com.mysql.jdbc.Driver")
    employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest","sparktest.employee",prop)



   val jdbcDF = spark.read.format("jdbc").
option("url","jdbc:mysql://localhost:3306/sparktest").
option("driver","com.mysql.jdbc.Driver").
option("dbtable", "employee").
option("user", "root").
option("password", "210374520").
load()




    jdbcDF.agg("age" -> "max", "age" -> "sum").show()


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/550116
推荐阅读
相关标签
  

闽ICP备14008679号