当前位置:   article > 正文

Spark读写MySQL数据库_spark mysql

spark mysql

Spark读写MySQL数据库

一、读取数据库

(一)通过RDD的方式读取MySQL数据库

四要素:驱动、连接地址、账号密码

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.sql.SparkSession

import java.sql.DriverManager

/**
 * 使用RDD读取MySQL数据库
 */
object spark_read_mysql {
  def main(args: Array[String]): Unit = {

    //创建SparkSession,作用:连接Spark
    val spark = SparkSession
      .builder()
      .master("local[*]") //指定运行的方式
      .appName("spark_read_mysql") //程序的名字
      .getOrCreate()
    //创建SparkContext
    val sc = spark.sparkContext

    //驱动名称
    val driver = "com.mysql.cj.jdbc.Driver"
    //连接信息
    val url = "jdbc:mysql://192.168.80.145:3306/test"
    //用户名
    val username = "root"
    //密码
    val password = "123456"

    //具体的SQL查询语句
    val sql = "select * from t_user where id>=? and id<=?"

    //查询
    val rsRDD = new JdbcRDD(
      sc,
      ()=>{
        //加载驱动
        Class.forName(driver)
        //创建和MySQL数据库的连接
        DriverManager.getConnection(url,username,password)
      },
      //需要执行的SQL语句
      sql,
      //查询的开始行
      1,
      //查询的结束行
      20,
      //运行几个分区执行
      2,
      //返回值的处理(将返回值变为RDD的元素),数字从1开始,表示字段的编号
      rs => (rs.getInt(1),rs.getString(2),rs.getInt(3))
    )

    //将RDD的元素打印在终端
    rsRDD.collect().foreach(println)

    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
(二)通过DataFrame的方式读取MySQL数据库
import org.apache.spark.sql.SparkSession

/**
 * 使用DataFrame读取MySQL数据库
 */
object spark_read_mysql2 {
  def main(args: Array[String]): Unit = {

    //创建SparkSession,作用:连接Spark
    val spark = SparkSession
      .builder()
      .master("local[*]")//指定运行的方式
      .appName("spark_read_mysql2")//程序的名字
      .getOrCreate()

    //创建DataFrame
    val jdbcDF = spark.read.format("jdbc")
      .option("url","jdbc:mysql://192.168.80.145:3306/test")//指定连接
      .option("driver","com.mysql.cj.jdbc.Driver")//指定驱动
      .option("user","root")//指定连接的用户
      .option("password","123456")//指定连接的用户的密码
      .option("dbtable","t_user")//查询的表
      .load()//加载数据库表

    //在终端显示DataFrame的内容
    jdbcDF.show()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

二、添加数据到MySQL

(一)通过RDD的方式插入数据到MySQL

每个分区执行一次创建连接和关闭连接

import org.apache.spark.sql.SparkSession

import java.sql.DriverManager

/**
 * 使用RDD插入数据到MySQL,RDD的每个元素都会执行一次创建连接和关闭连接
 */
object spark_write_mysql {
  def main(args: Array[String]): Unit = {

    //创建SparkSession,作用:连接Spark
    val spark = SparkSession
      .builder()
      .master("local[*]") //指定运行的方式
      .appName("spark_write_mysql") //程序的名字
      .getOrCreate()
    //创建SparkContext
    val sc = spark.sparkContext

    //驱动名称
    val driver = "com.mysql.cj.jdbc.Driver"
    //连接信息
    //?useUnicode=true&characterEncoding=UTF-8 指定连接的参数;字符集为utf8,防止插入的数据中文乱码
    val url = "jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8"
    //用户名
    val username = "root"
    //密码
    val password = "123456"

    //创建RDD
    val rdd = sc.makeRDD(List(("zhaoba",20),("孙七",19)))

    //打印RDD的元素
    //rdd.collect().foreach(println)

    //通过循环的方式读取RDD的每条元素,将元素插入MySQL;一个元素执行一次创建连接和插入和关闭连接
    rdd.foreach {
      case (name,age) =>{
        //加载驱动
        Class.forName(driver)
        //创建和MySQL的链接
        val conn = DriverManager.getConnection(url,username,password)
        //添加的SQL语句
        val sql = "insert into t_user(name,age) values(?,?)"
        //给SQL语句配置参数
        val ps = conn.prepareStatement(sql)
        //根据参数的类型配置参数
        ps.setString(1,name)
        ps.setInt(2,age)
        //执行SQL语句
        ps.executeUpdate()
        //关闭连接
        ps.close()
        conn.close()
      }
    }

    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
(二)通过RDD的方式插入数据到MySQL 2

每个分区执行一次创建连接和关闭连接

import org.apache.spark.sql.SparkSession

import java.sql.DriverManager

/**
 * 使用RDD插入数据到MySQL,RDD的每个分区执行一次创建连接和关闭连接;推荐
 */
object spark_write_mysql2 {
  def main(args: Array[String]): Unit = {

    //创建SparkSession,作用:连接Spark
    val spark = SparkSession
      .builder()
      .master("local[*]") //指定运行的方式
      .appName("spark_write_mysql2") //程序的名字
      .getOrCreate()
    //创建SparkContext
    val sc = spark.sparkContext

    //驱动名称
    val driver = "com.mysql.cj.jdbc.Driver"
    //连接信息
    //?useUnicode=true&characterEncoding=UTF-8 指定连接的参数;字符集为utf8,防止插入的数据中文乱码
    val url = "jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8"
    //用户名
    val username = "root"
    //密码
    val password = "123456"

    //创建RDD
    val rdd = sc.makeRDD(List(("zhaoba",20),("孙七",19)))

    //打印RDD的元素
    //rdd.collect().foreach(println)

    //通过循环的方式读取RDD的每个分区,将元素插入MySQL;一个分区执行一次创建连接和关闭连接
    rdd.foreachPartition {
      datas =>{
        //加载驱动
        Class.forName(driver)
        //创建和MySQL的链接
        val conn = DriverManager.getConnection(url,username,password)
        //添加的SQL语句
        val sql = "insert into t_user(name,age) values(?,?)"
        //给SQL语句配置参数
        val ps = conn.prepareStatement(sql)
        //根据参数的类型配置参数
        datas.foreach{
          case (name,age)=>{
            ps.setString(1,name)
            ps.setInt(2,age)

            //执行SQL语句
            ps.executeUpdate()
          }
        }
        //关闭连接
        ps.close()
        conn.close()
      }
    }

    sc.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
(三)使用DataFrame插入数据到MySQL
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

/**
 * 使用DataFrame插入数据到MySQL
 */

object spark_write_mysql3 {
  def main(args: Array[String]): Unit = {

    //创建SparkSession,作用:连接Spark
    val spark = SparkSession
      .builder()
      .master("local[*]") //指定运行的方式
      .appName("spark_write_mysql3") //程序的名字
      .getOrCreate()

    //1.创建DataFrame
    //1.1 schema
    val schema = StructType(List(StructField("name", StringType,true),StructField("age",IntegerType,true)))
    //1.2 行rows
    //1.2.1 创建RDD
    val dataRDD = spark.sparkContext.parallelize(Array(Array("李四",20),Array("王五",20)))
    //1.2.2 创建rows
    val rows = dataRDD.map(x=>Row(x(0),x(1)))
    //1.3 拼接表头(schema)和行内容(rows)
    val df = spark.createDataFrame(rows,schema)

    //2.通过DataFrame插入数据到MySQL
    //如果直接使用df.write则会将整个DataFrame的表写入MySQL形成一个新表,需要注意表不能存在
    //df.write.mode("append"),是以追加的方式将数据写入到已经存在的表中
    df.write
      .format("jdbc")
      .option("url", "jdbc:mysql://192.168.80.145:3306/test?useUnicode=true&characterEncoding=UTF-8") //指定连接
      .option("driver", "com.mysql.cj.jdbc.Driver") //指定驱动
      .option("user", "root") //指定连接的用户
      .option("password", "123456") //指定连接的用户的密码
      .option("dbtable", "t_user2") //查询的表
      .save()//保存数据
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/运维做开发/article/detail/751766
推荐阅读
相关标签
  

闽ICP备14008679号