赞
踩
四要素:驱动、连接地址、账号密码
import org.apache.spark.rdd.JdbcRDD import org.apache.spark.sql.SparkSession import java.sql.DriverManager /** * 使用RDD读取MySQL数据库 */ object spark_read_mysql { def main(args: Array[String]): Unit = { //创建SparkSession,作用:连接Spark val spark = SparkSession .builder() .master("local[*]") //指定运行的方式 .appName("spark_read_mysql") //程序的名字 .getOrCreate() //创建SparkContext val sc = spark.sparkContext //驱动名称 val driver = "com.mysql.cj.jdbc.Driver" //连接信息 val url = "jdbc:mysql://192.168.80.145:3306/test" //用户名 val username = "root" //密码 val password = "123456" //具体的SQL查询语句 val sql = "select * from t_user where id>=? and id<=?" //查询 val rsRDD = new JdbcRDD( sc, ()=>{ //加载驱动 Class.forName(driver) //创建和MySQL数据库的连接 DriverManager.getConnection(url,username,password) }, //需要执行的SQL语句 sql, //查询的开始行 1, //查询的结束行 20, //运行几个分区执行 2, //返回值的处理(将返回值变为RDD的元素),数字从1开始,表示字段的编号 rs => (rs.getInt(1),rs.getString(2),rs.getInt(3)) ) //将RDD的元素打印在终端 rsRDD.collect().foreach(println) sc.stop() } }
import org.apache.spark.sql.SparkSession /** * 使用DataFrame读取MySQL数据库 */ object spark_read_mysql2 { def main(args: Array[String]): Unit = { //创建SparkSession,作用:连接Spark val spark = SparkSession .builder() .master("local[*]")//指定运行的方式 .appName("spark_read_mysql2")//程序的名字 .getOrCreate() //创建DataFrame val jdbcDF = spark.read.format("jdbc") .option("url","jdbc:mysql://192.168.80.145:3306/test")//指定连接 .option("driver","com.mysql.cj.jdbc.Driver")//指定驱动 .option("user","root")//指定连接的用户 .option("password","123456")//指定连接的用户的密码 .option("dbtable","t_user")//查询的表 .load()//加载数据库表 //在终端显示DataFrame的内容 jdbcDF.show() } }
每个分区执行一次创建连接和关闭连接
import org.apache.spark.sql.SparkSession import java.sql.DriverManager /** * 使用RDD插入数据到MySQL,RDD的每个元素都会执行一次创建连接和关闭连接 */ object spark_write_mysql { def main(args: Array[String]): Unit = { //创建SparkSession,作用:连接Spark val spark = SparkSession .builder() .master("local[*]") //指定运行的方式 .appName("spark_write_mysql") //程序的名字 .getOrCreate() //创建SparkContext val sc = spark.sparkContext //驱动名称 val driver = "com.mysql.cj.jdbc.Driver" //连接信息 //?useUnicode=true&characterEncoding=UTF-8 指定连接的参数;字符集为utf8,防止插入的数据中文乱码 val url = "jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8" //用户名 val username = "root" //密码 val password = "123456" //创建RDD val rdd = sc.makeRDD(List(("zhaoba",20),("孙七",19))) //打印RDD的元素 //rdd.collect().foreach(println) //通过循环的方式读取RDD的每条元素,将元素插入MySQL;一个元素执行一次创建连接和插入和关闭连接 rdd.foreach { case (name,age) =>{ //加载驱动 Class.forName(driver) //创建和MySQL的链接 val conn = DriverManager.getConnection(url,username,password) //添加的SQL语句 val sql = "insert into t_user(name,age) values(?,?)" //给SQL语句配置参数 val ps = conn.prepareStatement(sql) //根据参数的类型配置参数 ps.setString(1,name) ps.setInt(2,age) //执行SQL语句 ps.executeUpdate() //关闭连接 ps.close() conn.close() } } sc.stop() } }
每个分区执行一次创建连接和关闭连接
import org.apache.spark.sql.SparkSession import java.sql.DriverManager /** * 使用RDD插入数据到MySQL,RDD的每个分区执行一次创建连接和关闭连接;推荐 */ object spark_write_mysql2 { def main(args: Array[String]): Unit = { //创建SparkSession,作用:连接Spark val spark = SparkSession .builder() .master("local[*]") //指定运行的方式 .appName("spark_write_mysql2") //程序的名字 .getOrCreate() //创建SparkContext val sc = spark.sparkContext //驱动名称 val driver = "com.mysql.cj.jdbc.Driver" //连接信息 //?useUnicode=true&characterEncoding=UTF-8 指定连接的参数;字符集为utf8,防止插入的数据中文乱码 val url = "jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8" //用户名 val username = "root" //密码 val password = "123456" //创建RDD val rdd = sc.makeRDD(List(("zhaoba",20),("孙七",19))) //打印RDD的元素 //rdd.collect().foreach(println) //通过循环的方式读取RDD的每个分区,将元素插入MySQL;一个分区执行一次创建连接和关闭连接 rdd.foreachPartition { datas =>{ //加载驱动 Class.forName(driver) //创建和MySQL的链接 val conn = DriverManager.getConnection(url,username,password) //添加的SQL语句 val sql = "insert into t_user(name,age) values(?,?)" //给SQL语句配置参数 val ps = conn.prepareStatement(sql) //根据参数的类型配置参数 datas.foreach{ case (name,age)=>{ ps.setString(1,name) ps.setInt(2,age) //执行SQL语句 ps.executeUpdate() } } //关闭连接 ps.close() conn.close() } } sc.stop() } }
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} /** * 使用DataFrame插入数据到MySQL */ object spark_write_mysql3 { def main(args: Array[String]): Unit = { //创建SparkSession,作用:连接Spark val spark = SparkSession .builder() .master("local[*]") //指定运行的方式 .appName("spark_write_mysql3") //程序的名字 .getOrCreate() //1.创建DataFrame //1.1 schema val schema = StructType(List(StructField("name", StringType,true),StructField("age",IntegerType,true))) //1.2 行rows //1.2.1 创建RDD val dataRDD = spark.sparkContext.parallelize(Array(Array("李四",20),Array("王五",20))) //1.2.2 创建rows val rows = dataRDD.map(x=>Row(x(0),x(1))) //1.3 拼接表头(schema)和行内容(rows) val df = spark.createDataFrame(rows,schema) //2.通过DataFrame插入数据到MySQL //如果直接使用df.write则会将整个DataFrame的表写入MySQL形成一个新表,需要注意表不能存在 //df.write.mode("append"),是以追加的方式将数据写入到已经存在的表中 df.write .format("jdbc") .option("url", "jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8") //指定连接 .option("driver", "com.mysql.cj.jdbc.Driver") //指定驱动 .option("user", "root") //指定连接的用户 .option("password", "123456") //指定连接的用户的密码 .option("dbtable", "t_user2") //查询的表 .save()//保存数据 } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。