当前位置:   article > 正文

编程实现利用 DataFrame 读写 MySQL 的数据_编程实现利用dataframe读写mysql的数据 (1)在mysql数据库中新建数据库sparkt

编程实现利用dataframe读写mysql的数据 (1)在mysql数据库中新建数据库sparktest,

编程实现利用 DataFrame 读写 MySQL 的数据

(1) 在 MySQL 数据库中新建数据库 sparktest,再建表 employee,包含下列两行数据;
表 1 employee 表原有数据

idnamegenderage
1AliceF22
2JohnM25

mysql执行命令如下:
1)启动mysql

mysql -uroot -p
  • 1

2)创建数据库以及数据表

create database sparktest; 
use sparktest;
create table employee(id int(4),name char(50), gender char(20), age int(10)); 
insert into employee values(1,'Alice','F',22);
insert into employee values(2,'John','M',25);
select * from employee;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

(2) 配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入下列数据到MySQL,最后打印出 age 的最大值和 age 的总和。
表 2 employee 表新增数据

idnamegenderage
3MaryF26
4TomM23

1)下载jar包:

https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.46.zip
  • 1

2)启动spark

spark-shell --jars /home/hadoop/stuzip/mysql-connector-java-5.1.46-bin.jar --driver-class-path /home/hadoop/stuzip/mysql-connector-java-5.1.46-bin.jar
  • 1

3)在DataFrame中执行命令

scala> import java.util.Properties
scala> import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.Row
scala> import org.apache.spark.sql.SparkSession
scala> val spark=SparkSession.builder().appName("TestMySQL").master("local").getOrCreate()
scala> import  spark.implicits._
scala> val employeeRDD=spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))
scala> val schema=StructType(List(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("gender",StringType,true),StructField("age",IntegerType,true)))
scala> val rowRDD=employeeRDD.map(p=>Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))
scala> val employeeDF=spark.createDataFrame(rowRDD,schema)
scala> val prop=new Properties()
scala> prop.put("user","root")
scala> prop.put("password","root")
scala> prop.put("driver","com.mysql.jdbc.Driver")
scala> employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest?characterEncoding=utf-8&useSSL=false", "sparktest.employee", prop)
scala> val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/sparktest?characterEncoding=utf-8&useSSL=false").option("driver","com.mysql.jdbc.Driver").option("dbtable", "employee").option("user", "root").option("password", ".Xuan0613").load()
scala> jdbcDF.agg("age" -> "max", "age" -> "sum").show()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

4)执行结果

插入数据后结果
*注:错误1:

scala> employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest","sparktest.employee",prop)
Fri Jan 21 23:51:59 CST 2022 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 1,648 milliseconds ago.  The last packet sent successfully to the server was 1,628 milliseconds ago.
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
  at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:990)
  at com.mysql.jdbc.ExportControlled.transformSocketToSSLSocket(ExportControlled.java:201)
  at com.mysql.jdbc.MysqlIO.negotiateSSLConnection(MysqlIO.java:4912)
  at com.mysql.jdbc.MysqlIO.proceedHandshakeWithPluggableAuthentication(MysqlIO.java:1663)
  at com.mysql.jdbc.MysqlIO.doHandshake(MysqlIO.java:1224)
  at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2190)
  at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2221)
  at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2016)
  at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:776)
  at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:47)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
  at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:386)
  at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:330)
  at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:59)
  at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:50)
  at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:59)
  at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:426)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
  at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:446)
  ... 52 elided
Caused by: javax.net.ssl.SSLHandshakeException: No appropriate protocol (protocol is disabled or cipher suites are inappropriate)
  at sun.security.ssl.HandshakeContext.<init>(HandshakeContext.java:171)
  at sun.security.ssl.ClientHandshakeContext.<init>(ClientHandshakeContext.java:101)
  at sun.security.ssl.TransportContext.kickstart(TransportContext.java:238)
  at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:394)
  at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:373)
  at com.mysql.jdbc.ExportControlled.transformSocketToSSLSocket(ExportControlled.java:186)
  ... 73 more
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
解决:未添加安全链接限制
  • 1
jdbc:mysql://localhost:3306/sparktest?**characterEncoding=utf-8&useSSL=false**
  • 1
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/589179
推荐阅读
相关标签
  

闽ICP备14008679号