赞
踩
(1) 在 MySQL 数据库中新建数据库 sparktest,再建表 employee,包含下列两行数据;
表 1 employee 表原有数据
id | name | gender | age |
---|---|---|---|
1 | Alice | F | 22 |
2 | John | M | 25 |
mysql执行命令如下:
1)启动mysql
mysql -uroot -p
2)创建数据库以及数据表
create database sparktest;
use sparktest;
create table employee(id int(4),name char(50), gender char(20), age int(10));
insert into employee values(1,'Alice','F',22);
insert into employee values(2,'John','M',25);
select * from employee;
(2) 配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入下列数据到MySQL,最后打印出 age 的最大值和 age 的总和。
表 2 employee 表新增数据
id | name | gender | age |
---|---|---|---|
3 | Mary | F | 26 |
4 | Tom | M | 23 |
1)下载jar包:
https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.46.zip
2)启动spark
spark-shell --jars /home/hadoop/stuzip/mysql-connector-java-5.1.46-bin.jar --driver-class-path /home/hadoop/stuzip/mysql-connector-java-5.1.46-bin.jar
3)在DataFrame中执行命令
scala> import java.util.Properties scala> import org.apache.spark.sql.types._ scala> import org.apache.spark.sql.Row scala> import org.apache.spark.sql.SparkSession scala> val spark=SparkSession.builder().appName("TestMySQL").master("local").getOrCreate() scala> import spark.implicits._ scala> val employeeRDD=spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" ")) scala> val schema=StructType(List(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("gender",StringType,true),StructField("age",IntegerType,true))) scala> val rowRDD=employeeRDD.map(p=>Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt)) scala> val employeeDF=spark.createDataFrame(rowRDD,schema) scala> val prop=new Properties() scala> prop.put("user","root") scala> prop.put("password","root") scala> prop.put("driver","com.mysql.jdbc.Driver") scala> employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest?characterEncoding=utf-8&useSSL=false", "sparktest.employee", prop) scala> val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/sparktest?characterEncoding=utf-8&useSSL=false").option("driver","com.mysql.jdbc.Driver").option("dbtable", "employee").option("user", "root").option("password", ".Xuan0613").load() scala> jdbcDF.agg("age" -> "max", "age" -> "sum").show()
4)执行结果
*注:错误1:
scala> employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest","sparktest.employee",prop) Fri Jan 21 23:51:59 CST 2022 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification. com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet successfully received from the server was 1,648 milliseconds ago. The last packet sent successfully to the server was 1,628 milliseconds ago. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:990) at com.mysql.jdbc.ExportControlled.transformSocketToSSLSocket(ExportControlled.java:201) at com.mysql.jdbc.MysqlIO.negotiateSSLConnection(MysqlIO.java:4912) at com.mysql.jdbc.MysqlIO.proceedHandshakeWithPluggableAuthentication(MysqlIO.java:1663) at com.mysql.jdbc.MysqlIO.doHandshake(MysqlIO.java:1224) at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2190) at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2221) at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2016) at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:776) at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:47) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:386) at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:330) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:59) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:50) at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:59) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:426) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215) at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:446) ... 52 elided Caused by: javax.net.ssl.SSLHandshakeException: No appropriate protocol (protocol is disabled or cipher suites are inappropriate) at sun.security.ssl.HandshakeContext.<init>(HandshakeContext.java:171) at sun.security.ssl.ClientHandshakeContext.<init>(ClientHandshakeContext.java:101) at sun.security.ssl.TransportContext.kickstart(TransportContext.java:238) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:394) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:373) at com.mysql.jdbc.ExportControlled.transformSocketToSSLSocket(ExportControlled.java:186) ... 73 more
解决:未添加安全链接限制
jdbc:mysql://localhost:3306/sparktest?**characterEncoding=utf-8&useSSL=false**
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。